Passed
Push — 2.x ( e94308...699219 )
by Jordi
09:28
created

bika.lims.api.is_temporary()   B

Complexity

Conditions 7

Size

Total Lines 25
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 25
rs 8
c 0
b 0
f 0
cc 7
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
import re
23
from collections import OrderedDict
24
from datetime import datetime
25
from datetime import timedelta
26
from itertools import groupby
27
28
import six
29
30
import Missing
31
from AccessControl.PermissionRole import rolesForPermissionOn
32
from Acquisition import aq_base
33
from Acquisition import aq_inner
34
from Acquisition import aq_parent
35
from bika.lims import logger
36
from bika.lims.interfaces import IClient
37
from bika.lims.interfaces import IContact
38
from bika.lims.interfaces import ILabContact
39
from DateTime import DateTime
40
from DateTime.interfaces import DateTimeError
41
from plone import api as ploneapi
42
from plone.api.exc import InvalidParameterError
43
from plone.app.layout.viewlets.content import ContentHistoryView
44
from plone.behavior.interfaces import IBehaviorAssignable
45
from plone.dexterity.interfaces import IDexterityContent
46
from plone.dexterity.schema import SchemaInvalidatedEvent
47
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
48
from plone.i18n.normalizer.interfaces import IIDNormalizer
49
from plone.memoize.volatile import DontCache
50
from Products.Archetypes.atapi import DisplayList
51
from Products.Archetypes.BaseObject import BaseObject
52
from Products.Archetypes.event import ObjectInitializedEvent
53
from Products.Archetypes.utils import mapply
54
from Products.CMFCore.interfaces import IFolderish
55
from Products.CMFCore.interfaces import ISiteRoot
56
from Products.CMFCore.permissions import ModifyPortalContent
57
from Products.CMFCore.permissions import View
58
from Products.CMFCore.utils import getToolByName
59
from Products.CMFCore.WorkflowCore import WorkflowException
60
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
61
from Products.CMFPlone.utils import _createObjectByType
62
from Products.CMFPlone.utils import base_hasattr
63
from Products.CMFPlone.utils import safe_unicode
64
from Products.PlonePAS.tools.memberdata import MemberData
65
from Products.ZCatalog.interfaces import ICatalogBrain
66
from zope import globalrequest
67
from zope.annotation.interfaces import IAttributeAnnotatable
68
from zope.component import getUtility
69
from zope.component import queryMultiAdapter
70
from zope.component.interfaces import IFactory
71
from zope.event import notify
72
from zope.interface import directlyProvides
73
from zope.lifecycleevent import ObjectCreatedEvent
74
from zope.publisher.browser import TestRequest
75
from zope.schema import getFieldsInOrder
76
from zope.security.interfaces import Unauthorized
77
78
"""SENAITE LIMS Framework API
79
80
Please see bika.lims/docs/API.rst for documentation.
81
82
Architecural Notes:
83
84
Please add only functions that do a single thing for a single object.
85
86
Good: `def get_foo(brain_or_object)`
87
Bad:  `def get_foos(list_of_brain_objects)`
88
89
Why?
90
91
Because it makes things more complex. You can always use a pattern like this to
92
achieve the same::
93
94
    >>> foos = map(get_foo, list_of_brain_objects)
95
96
Please add for all of your functions a descriptive test in docs/API.rst.
97
98
Thanks.
99
"""
100
101
_marker = object()
102
103
UID_RX = re.compile("[a-z0-9]{32}$")
104
105
106
class APIError(Exception):
107
    """Base exception class for bika.lims errors."""
108
109
110
def get_portal():
111
    """Get the portal object
112
113
    :returns: Portal object
114
    """
115
    return ploneapi.portal.getSite()
116
117
118
def get_setup():
119
    """Fetch the `bika_setup` folder.
120
    """
121
    portal = get_portal()
122
    return portal.get("bika_setup")
123
124
125
def get_bika_setup():
126
    """Fetch the `bika_setup` folder.
127
    """
128
    return get_setup()
129
130
131
def get_senaite_setup():
132
    """Fetch the new DX `setup` folder.
133
    """
134
    portal = get_portal()
135
    return portal.get("setup")
136
137
138
def create(container, portal_type, *args, **kwargs):
139
    """Creates an object in Bika LIMS
140
141
    This code uses most of the parts from the TypesTool
142
    see: `Products.CMFCore.TypesTool._constructInstance`
143
144
    :param container: container
145
    :type container: ATContentType/DexterityContentType/CatalogBrain
146
    :param portal_type: The portal type to create, e.g. "Client"
147
    :type portal_type: string
148
    :param title: The title for the new content object
149
    :type title: string
150
    :returns: The new created object
151
    """
152
    from bika.lims.utils import tmpID
153
154
    id = kwargs.pop("id", tmpID())
155
    title = kwargs.pop("title", "New {}".format(portal_type))
156
157
    # get the fti
158
    types_tool = get_tool("portal_types")
159
    fti = types_tool.getTypeInfo(portal_type)
160
161
    if fti.product:
162
        # create the AT object
163
        obj = _createObjectByType(portal_type, container, id)
164
        # update the object with values
165
        edit(obj, check_permissions=False, title=title, **kwargs)
166
        # auto-id if required
167
        if obj._at_rename_after_creation:
168
            obj._renameAfterCreation(check_auto_id=True)
169
        # we are no longer under creation
170
        obj.unmarkCreationFlag()
171
        # notify that the object was created
172
        notify(ObjectInitializedEvent(obj))
173
    else:
174
        # newstyle factory
175
        factory = getUtility(IFactory, fti.factory)
176
        obj = factory(id, *args, **kwargs)
177
        if hasattr(obj, '_setPortalTypeName'):
178
            obj._setPortalTypeName(fti.getId())
179
        # set the title
180
        obj.title = safe_unicode(title)
181
        # notify that the object was created
182
        notify(ObjectCreatedEvent(obj))
183
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and
184
        # ContainerModifiedEvent
185
        container._setObject(id, obj)
186
        # we get the object here with the current object id, as it might be
187
        # renamed already by an event handler
188
        obj = container._getOb(obj.getId())
189
190
    return obj
191
192
193
def copy_object(source, container=None, portal_type=None, *args, **kwargs):
194
    """Creates a copy of the source object. If container is None, creates the
195
    copy inside the same container as the source. If portal_type is specified,
196
    creates a new object of this type, and copies the values from source fields
197
    to the destination object. Field values sent as kwargs have priority over
198
    the field values from source.
199
200
    :param source: object from which create a copy
201
    :type source: ATContentType/DexterityContentType/CatalogBrain
202
    :param container: destination container
203
    :type container: ATContentType/DexterityContentType/CatalogBrain
204
    :param portal_type: destination portal type
205
    :returns: The new created object
206
    """
207
    # Prevent circular dependencies
208
    from security import check_permission
209
    # Use same container as source unless explicitly set
210
    source = get_object(source)
211
    if not container:
212
        container = get_parent(source)
213
214
    # Use same portal type as source unless explicitly set
215
    if not portal_type:
216
        portal_type = get_portal_type(source)
217
218
    # Extend the fields to skip with defaults
219
    skip = kwargs.pop("skip", [])
220
    skip = set(skip)
221
    skip.update([
222
        "Products.Archetypes.Field.ComputedField",
223
        "UID",
224
        "id",
225
        "allowDiscussion",
226
        "contributors",
227
        "creation_date",
228
        "creators",
229
        "effectiveDate",
230
        "expirationDate",
231
        "language",
232
        "location",
233
        "modification_date",
234
        "rights",
235
        "subject",
236
    ])
237
    # Build a dict for complexity reduction
238
    skip = dict([(item, True) for item in skip])
239
240
    # Update kwargs with the field values to copy from source
241
    fields = get_fields(source)
242
    for field_name, field in fields.items():
243
        # Prioritize field values passed as kwargs
244
        if field_name in kwargs:
245
            continue
246
        # Skip framework internal fields by name
247
        if skip.get(field_name, False):
248
            continue
249
        # Skip fields of non-suitable types
250
        if hasattr(field, "getType") and skip.get(field.getType(), False):
251
            continue
252
        # Skip readonly fields
253
        if getattr(field, "readonly", False):
254
            continue
255
        # Skip non-readable fields
256
        perm = getattr(field, "read_permission", View)
257
        if perm and not check_permission(perm, source):
258
            continue
259
260
        # do not wake-up objects unnecessarily
261
        if hasattr(field, "getRaw"):
262
            field_value = field.getRaw(source)
263
        elif hasattr(field, "get_raw"):
264
            field_value = field.get_raw(source)
265
        elif hasattr(field, "getAccessor"):
266
            accessor = field.getAccessor(source)
267
            field_value = accessor()
268
        else:
269
            field_value = field.get(source)
270
271
        # Do a hard copy of value if mutable type
272
        if isinstance(field_value, (list, dict, set)):
273
            field_value = copy.deepcopy(field_value)
274
        kwargs.update({field_name: field_value})
275
276
    # Create a copy
277
    return create(container, portal_type, *args, **kwargs)
278
279
280
def edit(obj, check_permissions=True, **kwargs):
281
    """Updates the values of object fields with the new values passed-in
282
    """
283
    # Prevent circular dependencies
284
    from security import check_permission
285
    fields = get_fields(obj)
286
    for name, value in kwargs.items():
287
        field = fields.get(name, None)
288
        if not field:
289
            continue
290
291
        # cannot update readonly fields
292
        readonly = getattr(field, "readonly", False)
293
        if readonly:
294
            raise ValueError("Field '{}' is readonly".format(name))
295
296
        # check field writable permission
297
        if check_permissions:
298
            perm = getattr(field, "write_permission", ModifyPortalContent)
299
            if perm and not check_permission(perm, obj):
300
                raise Unauthorized("Field '{}' is not writeable".format(name))
301
302
        # Set the value
303
        if hasattr(field, "getMutator"):
304
            mutator = field.getMutator(obj)
305
            mapply(mutator, value)
306
        else:
307
            field.set(obj, value)
308
309
310
def get_tool(name, context=None, default=_marker):
311
    """Get a portal tool by name
312
313
    :param name: The name of the tool, e.g. `portal_catalog`
314
    :type name: string
315
    :param context: A portal object
316
    :type context: ATContentType/DexterityContentType/CatalogBrain
317
    :returns: Portal Tool
318
    """
319
320
    # Try first with the context
321
    if context is not None:
322
        try:
323
            context = get_object(context)
324
            return getToolByName(context, name)
325
        except (APIError, AttributeError) as e:
326
            # https://github.com/senaite/bika.lims/issues/396
327
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
328
                        "-> falling back to plone.api.portal.get_tool('{}')"
329
                        .format(repr(context), name, repr(e), name))
330
            return get_tool(name, default=default)
331
332
    # Try with the plone api
333
    try:
334
        return ploneapi.portal.get_tool(name)
335
    except InvalidParameterError:
336
        if default is not _marker:
337
            if isinstance(default, six.string_types):
338
                return get_tool(default)
339
            return default
340
        fail("No tool named '%s' found." % name)
341
342
343
def fail(msg=None):
344
    """API LIMS Error
345
    """
346
    if msg is None:
347
        msg = "Reason not given."
348
    raise APIError("{}".format(msg))
349
350
351
def is_object(brain_or_object):
352
    """Check if the passed in object is a supported portal content object
353
354
    :param brain_or_object: A single catalog brain or content object
355
    :type brain_or_object: Portal Object
356
    :returns: True if the passed in object is a valid portal content
357
    """
358
    if is_portal(brain_or_object):
359
        return True
360
    if is_supermodel(brain_or_object):
361
        return True
362
    if is_at_content(brain_or_object):
363
        return True
364
    if is_dexterity_content(brain_or_object):
365
        return True
366
    if is_brain(brain_or_object):
367
        return True
368
    return False
369
370
371
def get_object(brain_object_uid, default=_marker):
372
    """Get the full content object
373
374
    :param brain_object_uid: A catalog brain or content object or uid
375
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
376
    /CatalogBrain/basestring
377
    :returns: The full object
378
    """
379
    if is_uid(brain_object_uid):
380
        return get_object_by_uid(brain_object_uid)
381
    elif is_supermodel(brain_object_uid):
382
        return brain_object_uid.instance
383
    if not is_object(brain_object_uid):
384
        if default is _marker:
385
            fail("{} is not supported.".format(repr(brain_object_uid)))
386
        return default
387
    if is_brain(brain_object_uid):
388
        return brain_object_uid.getObject()
389
    return brain_object_uid
390
391
392
def is_portal(brain_or_object):
393
    """Checks if the passed in object is the portal root object
394
395
    :param brain_or_object: A single catalog brain or content object
396
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
397
    :returns: True if the object is the portal root object
398
    :rtype: bool
399
    """
400
    return ISiteRoot.providedBy(brain_or_object)
401
402
403
def is_brain(brain_or_object):
404
    """Checks if the passed in object is a portal catalog brain
405
406
    :param brain_or_object: A single catalog brain or content object
407
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
408
    :returns: True if the object is a catalog brain
409
    :rtype: bool
410
    """
411
    return ICatalogBrain.providedBy(brain_or_object)
412
413
414
def is_supermodel(brain_or_object):
415
    """Checks if the passed in object is a supermodel
416
417
    :param brain_or_object: A single catalog brain or content object
418
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
419
    :returns: True if the object is a catalog brain
420
    :rtype: bool
421
    """
422
    # avoid circular imports
423
    from senaite.app.supermodel.interfaces import ISuperModel
424
    return ISuperModel.providedBy(brain_or_object)
425
426
427
def is_dexterity_content(brain_or_object):
428
    """Checks if the passed in object is a dexterity content type
429
430
    :param brain_or_object: A single catalog brain or content object
431
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
432
    :returns: True if the object is a dexterity content type
433
    :rtype: bool
434
    """
435
    return IDexterityContent.providedBy(brain_or_object)
436
437
438
def is_at_content(brain_or_object):
439
    """Checks if the passed in object is an AT content type
440
441
    :param brain_or_object: A single catalog brain or content object
442
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
443
    :returns: True if the object is an AT content type
444
    :rtype: bool
445
    """
446
    return isinstance(brain_or_object, BaseObject)
447
448
449
def is_dx_type(portal_type):
450
    """Checks if the portal type is DX based
451
452
    :param portal_type: The portal type name to check
453
    :returns: True if the portal type is DX based
454
    """
455
    portal_types = get_tool("portal_types")
456
    fti = portal_types.getTypeInfo(portal_type)
457
    if fti.product:
458
        return False
459
    return True
460
461
462
def is_at_type(portal_type):
463
    """Checks if the portal type is AT based
464
465
    :param portal_type: The portal type name to check
466
    :returns: True if the portal type is AT based
467
    """
468
    return not is_dx_type(portal_type)
469
470
471
def is_folderish(brain_or_object):
472
    """Checks if the passed in object is folderish
473
474
    :param brain_or_object: A single catalog brain or content object
475
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
476
    :returns: True if the object is folderish
477
    :rtype: bool
478
    """
479
    if hasattr(brain_or_object, "is_folderish"):
480
        if callable(brain_or_object.is_folderish):
481
            return brain_or_object.is_folderish()
482
        return brain_or_object.is_folderish
483
    return IFolderish.providedBy(get_object(brain_or_object))
484
485
486
def get_portal_type(brain_or_object):
487
    """Get the portal type for this object
488
489
    :param brain_or_object: A single catalog brain or content object
490
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
491
    :returns: Portal type
492
    :rtype: string
493
    """
494
    if not is_object(brain_or_object):
495
        fail("{} is not supported.".format(repr(brain_or_object)))
496
    return brain_or_object.portal_type
497
498
499
def get_schema(brain_or_object):
500
    """Get the schema of the content
501
502
    :param brain_or_object: A single catalog brain or content object
503
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
504
    :returns: Schema object
505
    """
506
    obj = get_object(brain_or_object)
507
    if is_portal(obj):
508
        fail("get_schema can't return schema of portal root")
509
    if is_dexterity_content(obj):
510
        pt = get_tool("portal_types")
511
        fti = pt.getTypeInfo(obj.portal_type)
512
        return fti.lookupSchema()
513
    if is_at_content(obj):
514
        return obj.Schema()
515
    fail("{} has no Schema.".format(brain_or_object))
516
517
518
def get_behaviors(portal_type):
519
    """List all behaviors
520
521
    :param portal_type: DX portal type name
522
    """
523
    portal_types = get_tool("portal_types")
524
    fti = portal_types.getTypeInfo(portal_type)
525
    if fti.product:
526
        raise TypeError("Expected DX type, got AT type instead.")
527
    return fti.behaviors
528
529
530
def enable_behavior(portal_type, behavior_id):
531
    """Enable behavior
532
533
    :param portal_type: DX portal type name
534
    :param behavior_id: The behavior to enable
535
    """
536
    portal_types = get_tool("portal_types")
537
    fti = portal_types.getTypeInfo(portal_type)
538
    if fti.product:
539
        raise TypeError("Expected DX type, got AT type instead.")
540
541
    if behavior_id not in fti.behaviors:
542
        fti.behaviors += (behavior_id, )
543
        # invalidate schema cache
544
        notify(SchemaInvalidatedEvent(portal_type))
545
546
547
def disable_behavior(portal_type, behavior_id):
548
    """Disable behavior
549
550
    :param portal_type: DX portal type name
551
    :param behavior_id: The behavior to disable
552
    """
553
    portal_types = get_tool("portal_types")
554
    fti = portal_types.getTypeInfo(portal_type)
555
    if fti.product:
556
        raise TypeError("Expected DX type, got AT type instead.")
557
    fti.behaviors = tuple(filter(lambda b: b != behavior_id, fti.behaviors))
558
    # invalidate schema cache
559
    notify(SchemaInvalidatedEvent(portal_type))
560
561
562
def get_fields(brain_or_object):
563
    """Get a name to field mapping of the object
564
565
    :param brain_or_object: A single catalog brain or content object
566
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
567
    :returns: Mapping of name -> field
568
    :rtype: OrderedDict
569
    """
570
    obj = get_object(brain_or_object)
571
    schema = get_schema(obj)
572
    if is_dexterity_content(obj):
573
        # get the fields directly provided by the interface
574
        fields = getFieldsInOrder(schema)
575
        # append the fields coming from behaviors
576
        behavior_assignable = IBehaviorAssignable(obj)
577
        if behavior_assignable:
578
            behaviors = behavior_assignable.enumerateBehaviors()
579
            for behavior in behaviors:
580
                fields.extend(getFieldsInOrder(behavior.interface))
581
        return OrderedDict(fields)
582
    return OrderedDict(schema)
583
584
585
def get_id(brain_or_object):
586
    """Get the Plone ID for this object
587
588
    :param brain_or_object: A single catalog brain or content object
589
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
590
    :returns: Plone ID
591
    :rtype: string
592
    """
593
    if is_brain(brain_or_object):
594
        if base_hasattr(brain_or_object, "getId"):
595
            return brain_or_object.getId
596
        if base_hasattr(brain_or_object, "id"):
597
            return brain_or_object.id
598
    return get_object(brain_or_object).getId()
599
600
601
def get_title(brain_or_object):
602
    """Get the Title for this object
603
604
    :param brain_or_object: A single catalog brain or content object
605
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
606
    :returns: Title
607
    :rtype: string
608
    """
609
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
610
        return brain_or_object.Title
611
    return get_object(brain_or_object).Title()
612
613
614
def get_description(brain_or_object):
615
    """Get the Title for this object
616
617
    :param brain_or_object: A single catalog brain or content object
618
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
619
    :returns: Title
620
    :rtype: string
621
    """
622
    if is_brain(brain_or_object) \
623
            and base_hasattr(brain_or_object, "Description"):
624
        return brain_or_object.Description
625
    return get_object(brain_or_object).Description()
626
627
628
def get_uid(brain_or_object):
629
    """Get the Plone UID for this object
630
631
    :param brain_or_object: A single catalog brain or content object or an UID
632
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
633
    :returns: Plone UID
634
    :rtype: string
635
    """
636
    if is_uid(brain_or_object):
637
        return brain_or_object
638
    if is_portal(brain_or_object):
639
        return '0'
640
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
641
        return brain_or_object.UID
642
    return get_object(brain_or_object).UID()
643
644
645
def get_url(brain_or_object):
646
    """Get the absolute URL for this object
647
648
    :param brain_or_object: A single catalog brain or content object
649
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
650
    :returns: Absolute URL
651
    :rtype: string
652
    """
653
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
654
        return brain_or_object.getURL()
655
    return get_object(brain_or_object).absolute_url()
656
657
658
def get_icon(brain_or_object, html_tag=True):
659
    """Get the icon of the content object
660
661
    :param brain_or_object: A single catalog brain or content object
662
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
663
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
664
    :type html_tag: bool
665
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
666
    :rtype: string
667
    """
668
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
669
    # work for Contents coming from other catalogs than the
670
    # `portal_catalog`
671
    portal_types = get_tool("portal_types")
672
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
673
    icon = fti.getIcon()
674
    if not icon:
675
        return ""
676
    url = "%s/%s" % (get_url(get_portal()), icon)
677
    if not html_tag:
678
        return url
679
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
680
        url=url, title=get_title(brain_or_object))
681
    return tag
682
683
684
def get_object_by_uid(uid, default=_marker):
685
    """Find an object by a given UID
686
687
    :param uid: The UID of the object to find
688
    :type uid: string
689
    :returns: Found Object or None
690
    """
691
692
    # nothing to do here
693
    if not uid:
694
        if default is not _marker:
695
            return default
696
        fail("get_object_by_uid requires UID as first argument; got {} instead"
697
             .format(uid))
698
699
    # we defined the portal object UID to be '0'::
700
    if uid == '0':
701
        return get_portal()
702
703
    brain = get_brain_by_uid(uid)
704
705
    if brain is None:
706
        if default is not _marker:
707
            return default
708
        fail("No object found for UID {}".format(uid))
709
710
    return get_object(brain)
711
712
713
def get_brain_by_uid(uid, default=None):
714
    """Query a brain by a given UID
715
716
    :param uid: The UID of the object to find
717
    :type uid: string
718
    :returns: ZCatalog brain or None
719
    """
720
    if not is_uid(uid):
721
        return default
722
723
    # we try to find the object with the UID catalog
724
    uc = get_tool("uid_catalog")
725
726
    # try to find the object with the reference catalog first
727
    brains = uc(UID=uid)
728
    if len(brains) != 1:
729
        return default
730
    return brains[0]
731
732
733
def get_object_by_path(path, default=_marker):
734
    """Find an object by a given physical path or absolute_url
735
736
    :param path: The physical path of the object to find
737
    :type path: string
738
    :returns: Found Object or None
739
    """
740
741
    # nothing to do here
742
    if not path:
743
        if default is not _marker:
744
            return default
745
        fail("get_object_by_path first argument must be a path; {} received"
746
             .format(path))
747
748
    portal = get_portal()
749
    portal_path = get_path(portal)
750
    portal_url = get_url(portal)
751
752
    # ensure we have a physical path
753
    if path.startswith(portal_url):
754
        request = get_request()
755
        path = "/".join(request.physicalPathFromURL(path))
756
757
    if not path.startswith(portal_path):
758
        if default is not _marker:
759
            return default
760
        fail("Not a physical path inside the portal.")
761
762
    if path == portal_path:
763
        return portal
764
765
    return portal.restrictedTraverse(path, default)
766
767
768
def get_path(brain_or_object):
769
    """Calculate the physical path of this object
770
771
    :param brain_or_object: A single catalog brain or content object
772
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
773
    :returns: Physical path of the object
774
    :rtype: string
775
    """
776
    if is_brain(brain_or_object):
777
        return brain_or_object.getPath()
778
    return "/".join(get_object(brain_or_object).getPhysicalPath())
779
780
781
def get_parent_path(brain_or_object):
782
    """Calculate the physical parent path of this object
783
784
    :param brain_or_object: A single catalog brain or content object
785
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
786
    :returns: Physical path of the parent object
787
    :rtype: string
788
    """
789
    if is_portal(brain_or_object):
790
        return get_path(get_portal())
791
    if is_brain(brain_or_object):
792
        path = get_path(brain_or_object)
793
        return path.rpartition("/")[0]
794
    return get_path(get_object(brain_or_object).aq_parent)
795
796
797
def get_parent(brain_or_object, catalog_search=False):
798
    """Locate the parent object of the content/catalog brain
799
800
    The `catalog_search` switch uses the `portal_catalog` to do a search return
801
    a brain instead of the full parent object. However, if the search returned
802
    no results, it falls back to return the full parent object.
803
804
    :param brain_or_object: A single catalog brain or content object
805
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
806
    :param catalog_search: Use a catalog query to find the parent object
807
    :type catalog_search: bool
808
    :returns: parent object
809
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
810
    """
811
812
    if is_portal(brain_or_object):
813
        return get_portal()
814
815
    # Do a catalog search and return the brain
816
    if catalog_search:
817
        parent_path = get_parent_path(brain_or_object)
818
819
        # parent is the portal object
820
        if parent_path == get_path(get_portal()):
821
            return get_portal()
822
823
        # get the catalog tool
824
        pc = get_portal_catalog()
825
826
        # query for the parent path
827
        results = pc(path={
828
            "query": parent_path,
829
            "depth": 0})
830
831
        # No results fallback: return the parent object
832
        if not results:
833
            return get_object(brain_or_object).aq_parent
834
835
        # return the brain
836
        return results[0]
837
838
    return get_object(brain_or_object).aq_parent
839
840
841
def search(query, catalog=_marker):
842
    """Search for objects.
843
844
    :param query: A suitable search query.
845
    :type query: dict
846
    :param catalog: A single catalog id or a list of catalog ids
847
    :type catalog: str/list
848
    :returns: Search results
849
    :rtype: List of ZCatalog brains
850
    """
851
852
    # query needs to be a dictionary
853
    if not isinstance(query, dict):
854
        fail("Catalog query needs to be a dictionary")
855
856
    # Portal types to query
857
    portal_types = query.get("portal_type", [])
858
    # We want the portal_type as a list
859
    if not isinstance(portal_types, (tuple, list)):
860
        portal_types = [portal_types]
861
862
    # The catalogs used for the query
863
    catalogs = []
864
865
    # The user did **not** specify a catalog
866
    if catalog is _marker:
867
        # Find the registered catalogs for the queried portal types
868
        for portal_type in portal_types:
869
            # Just get the first registered/default catalog
870
            catalogs.append(get_catalogs_for(
871
                portal_type, default="portal_catalog")[0])
872
    else:
873
        # User defined catalogs
874
        if isinstance(catalog, (list, tuple)):
875
            catalogs.extend(map(get_tool, catalog))
876
        else:
877
            catalogs.append(get_tool(catalog))
878
879
    # Cleanup: Avoid duplicate catalogs
880
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
881
882
    # We only support **single** catalog queries
883
    if len(catalogs) > 1:
884
        fail("Multi Catalog Queries are not supported!")
885
886
    return catalogs[0](query)
887
888
889
def safe_getattr(brain_or_object, attr, default=_marker):
890
    """Return the attribute value
891
892
    :param brain_or_object: A single catalog brain or content object
893
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
894
    :param attr: Attribute name
895
    :type attr: str
896
    :returns: Attribute value
897
    :rtype: obj
898
    """
899
    try:
900
        value = getattr(brain_or_object, attr, _marker)
901
        if value is _marker:
902
            if default is not _marker:
903
                return default
904
            fail("Attribute '{}' not found.".format(attr))
905
        if callable(value):
906
            return value()
907
        return value
908
    except Unauthorized:
909
        if default is not _marker:
910
            return default
911
        fail("You are not authorized to access '{}' of '{}'.".format(
912
            attr, repr(brain_or_object)))
913
914
915
def get_portal_catalog():
916
    """Get the portal catalog tool
917
918
    :returns: Portal Catalog Tool
919
    """
920
    return get_tool("portal_catalog")
921
922
923
def get_review_history(brain_or_object, rev=True):
924
    """Get the review history for the given brain or context.
925
926
    :param brain_or_object: A single catalog brain or content object
927
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
928
    :returns: Workflow history
929
    :rtype: [{}, ...]
930
    """
931
    obj = get_object(brain_or_object)
932
    review_history = []
933
    try:
934
        workflow = get_tool("portal_workflow")
935
        review_history = workflow.getInfoFor(obj, 'review_history')
936
    except WorkflowException as e:
937
        message = str(e)
938
        logger.error("Cannot retrieve review_history on {}: {}".format(
939
            obj, message))
940
    if not isinstance(review_history, (list, tuple)):
941
        logger.error("get_review_history: expected list, received {}".format(
942
            review_history))
943
        review_history = []
944
945
    if isinstance(review_history, tuple):
946
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
947
        # tuple when "review_history" is passed in:
948
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
949
        #
950
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
951
        # Expression.StateChangeInfo, that always returns a list, except when no
952
        # review_history is found:
953
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
954
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
955
        review_history = list(review_history)
956
957
    if rev is True:
958
        review_history.reverse()
959
    return review_history
960
961
962
def get_revision_history(brain_or_object):
963
    """Get the revision history for the given brain or context.
964
965
    :param brain_or_object: A single catalog brain or content object
966
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
967
    :returns: Workflow history
968
    :rtype: obj
969
    """
970
    obj = get_object(brain_or_object)
971
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
972
    return chv.fullHistory()
973
974
975
def get_workflows_for(brain_or_object):
976
    """Get the assigned workflows for the given brain or context.
977
978
    Note: This function supports also the portal_type as parameter.
979
980
    :param brain_or_object: A single catalog brain or content object
981
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
982
    :returns: Assigned Workflows
983
    :rtype: tuple
984
    """
985
    workflow = ploneapi.portal.get_tool("portal_workflow")
986
    if isinstance(brain_or_object, six.string_types):
987
        return workflow.getChainFor(brain_or_object)
988
    obj = get_object(brain_or_object)
989
    return workflow.getChainFor(obj)
990
991
992
def get_workflow_status_of(brain_or_object, state_var="review_state"):
993
    """Get the current workflow status of the given brain or context.
994
995
    :param brain_or_object: A single catalog brain or content object
996
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
997
    :param state_var: The name of the state variable
998
    :type state_var: string
999
    :returns: Status
1000
    :rtype: str
1001
    """
1002
    # Try to get the state from the catalog brain first
1003
    if is_brain(brain_or_object):
1004
        if state_var in brain_or_object.schema():
1005
            return brain_or_object[state_var]
1006
1007
    # Retrieve the sate from the object
1008
    workflow = get_tool("portal_workflow")
1009
    obj = get_object(brain_or_object)
1010
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
1011
1012
1013
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
1014
    """Get the previous workflow status of the object
1015
1016
    :param brain_or_object: A single catalog brain or content object
1017
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1018
    :param skip: Workflow states to skip
1019
    :type skip: tuple/list
1020
    :returns: status
1021
    :rtype: str
1022
    """
1023
1024
    skip = isinstance(skip, (list, tuple)) and skip or []
1025
    history = get_review_history(brain_or_object)
1026
1027
    # Remove consecutive duplicates, some transitions might happen more than
1028
    # once consecutively (e.g. publish)
1029
    history = map(lambda i: i[0], groupby(history))
1030
1031
    for num, item in enumerate(history):
1032
        # skip the current history entry
1033
        if num == 0:
1034
            continue
1035
        status = item.get("review_state")
1036
        if status in skip:
1037
            continue
1038
        return status
1039
    return default
1040
1041
1042
def get_creation_date(brain_or_object):
1043
    """Get the creation date of the brain or object
1044
1045
    :param brain_or_object: A single catalog brain or content object
1046
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1047
    :returns: Creation date
1048
    :rtype: DateTime
1049
    """
1050
    created = getattr(brain_or_object, "created", None)
1051
    if created is None:
1052
        fail("Object {} has no creation date ".format(
1053
             repr(brain_or_object)))
1054
    if callable(created):
1055
        return created()
1056
    return created
1057
1058
1059
def get_modification_date(brain_or_object):
1060
    """Get the modification date of the brain or object
1061
1062
    :param brain_or_object: A single catalog brain or content object
1063
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1064
    :returns: Modification date
1065
    :rtype: DateTime
1066
    """
1067
    modified = getattr(brain_or_object, "modified", None)
1068
    if modified is None:
1069
        fail("Object {} has no modification date ".format(
1070
             repr(brain_or_object)))
1071
    if callable(modified):
1072
        return modified()
1073
    return modified
1074
1075
1076
def get_review_status(brain_or_object):
1077
    """Get the `review_state` of an object
1078
1079
    :param brain_or_object: A single catalog brain or content object
1080
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1081
    :returns: Value of the review_status variable
1082
    :rtype: String
1083
    """
1084
    if is_brain(brain_or_object):
1085
        return brain_or_object.review_state
1086
    return get_workflow_status_of(brain_or_object, state_var="review_state")
1087
1088
1089
def is_active(brain_or_object):
1090
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
1091
1092
    :param brain_or_object: A single catalog brain or content object
1093
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1094
    :returns: False if the object is in the state 'inactive' or 'cancelled'
1095
    :rtype: bool
1096
    """
1097
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
1098
        return False
1099
    return True
1100
1101
1102
def get_catalogs_for(brain_or_object, default="portal_catalog"):
1103
    """Get all registered catalogs for the given portal_type, catalog brain or
1104
    content object
1105
1106
    :param brain_or_object: The portal_type, a catalog brain or content object
1107
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1108
    :returns: List of supported catalogs
1109
    :rtype: list
1110
    """
1111
    archetype_tool = get_tool("archetype_tool", default=None)
1112
    if archetype_tool is None:
1113
        # return the default catalog
1114
        return [get_tool(default, default="portal_catalog")]
1115
1116
    catalogs = []
1117
1118
    # get the registered catalogs for portal_type
1119
    if is_object(brain_or_object):
1120
        catalogs = archetype_tool.getCatalogsByType(
1121
            get_portal_type(brain_or_object))
1122
    if isinstance(brain_or_object, six.string_types):
1123
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
1124
1125
    if not catalogs:
1126
        return [get_tool(default)]
1127
    return catalogs
1128
1129
1130
def get_transitions_for(brain_or_object):
1131
    """List available workflow transitions for all workflows
1132
1133
    :param brain_or_object: A single catalog brain or content object
1134
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1135
    :returns: All possible available and allowed transitions
1136
    :rtype: list[dict]
1137
    """
1138
    workflow = get_tool('portal_workflow')
1139
    transitions = []
1140
    instance = get_object(brain_or_object)
1141
    for wfid in get_workflows_for(brain_or_object):
1142
        wf = workflow[wfid]
1143
        tlist = wf.getTransitionsFor(instance)
1144
        transitions.extend([t for t in tlist if t not in transitions])
1145
    return transitions
1146
1147
1148
def do_transition_for(brain_or_object, transition):
1149
    """Performs a workflow transition for the passed in object.
1150
1151
    :param brain_or_object: A single catalog brain or content object
1152
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1153
    :returns: The object where the transtion was performed
1154
    """
1155
    if not isinstance(transition, six.string_types):
1156
        fail("Transition type needs to be string, got '%s'" % type(transition))
1157
    obj = get_object(brain_or_object)
1158
    try:
1159
        ploneapi.content.transition(obj, transition)
1160
    except ploneapi.exc.InvalidParameterError as e:
1161
        fail("Failed to perform transition '{}' on {}: {}".format(
1162
             transition, obj, str(e)))
1163
    return obj
1164
1165
1166
def get_roles_for_permission(permission, brain_or_object):
1167
    """Get a list of granted roles for the given permission on the object.
1168
1169
    :param brain_or_object: A single catalog brain or content object
1170
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1171
    :returns: Roles for the given Permission
1172
    :rtype: list
1173
    """
1174
    obj = get_object(brain_or_object)
1175
    allowed = set(rolesForPermissionOn(permission, obj))
1176
    return sorted(allowed)
1177
1178
1179
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1180
    """Checks if the passed in object is versionable.
1181
1182
    :param brain_or_object: A single catalog brain or content object
1183
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1184
    :returns: True if the object is versionable
1185
    :rtype: bool
1186
    """
1187
    pr = get_tool("portal_repository")
1188
    obj = get_object(brain_or_object)
1189
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1190
        and pr.isVersionable(obj)
1191
1192
1193
def get_version(brain_or_object):
1194
    """Get the version of the current object
1195
1196
    :param brain_or_object: A single catalog brain or content object
1197
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1198
    :returns: The current version of the object, or None if not available
1199
    :rtype: int or None
1200
    """
1201
    obj = get_object(brain_or_object)
1202
    if not is_versionable(obj):
1203
        return None
1204
    return getattr(aq_base(obj), "version_id", 0)
1205
1206
1207
def get_view(name, context=None, request=None, default=None):
1208
    """Get the view by name
1209
1210
    :param name: The name of the view
1211
    :type name: str
1212
    :param context: The context to query the view
1213
    :type context: ATContentType/DexterityContentType/CatalogBrain
1214
    :param request: The request to query the view
1215
    :type request: HTTPRequest object
1216
    :returns: HTTP Request
1217
    :rtype: Products.Five.metaclass View object
1218
    """
1219
    context = context or get_portal()
1220
    request = request or get_request() or None
1221
    view = queryMultiAdapter((get_object(context), request), name=name)
1222
    if view is None:
1223
        return default
1224
    return view
1225
1226
1227
def get_request():
1228
    """Get the global request object
1229
1230
    :returns: HTTP Request
1231
    :rtype: HTTPRequest object
1232
    """
1233
    return globalrequest.getRequest()
1234
1235
1236
def get_test_request():
1237
    """Get the TestRequest object
1238
    """
1239
    request = TestRequest()
1240
    directlyProvides(request, IAttributeAnnotatable)
1241
    return request
1242
1243
1244
def get_group(group_or_groupname):
1245
    """Return Plone Group
1246
1247
    :param group_or_groupname: Plone group or the name of the group
1248
    :type groupname:  GroupData/str
1249
    :returns: Plone GroupData
1250
    """
1251
    if not group_or_groupname:
1252
1253
        return None
1254
    if hasattr(group_or_groupname, "_getGroup"):
1255
        return group_or_groupname
1256
    gtool = get_tool("portal_groups")
1257
    return gtool.getGroupById(group_or_groupname)
1258
1259
1260
def get_user(user_or_username):
1261
    """Return Plone User
1262
1263
    :param user_or_username: Plone user or user id
1264
    :returns: Plone MemberData
1265
    """
1266
    user = None
1267
    if isinstance(user_or_username, MemberData):
1268
        user = user_or_username
1269
    if isinstance(user_or_username, six.string_types):
1270
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1271
    return user
1272
1273
1274
def get_user_properties(user_or_username):
1275
    """Return User Properties
1276
1277
    :param user_or_username: Plone group identifier
1278
    :returns: Plone MemberData
1279
    """
1280
    user = get_user(user_or_username)
1281
    if user is None:
1282
        return {}
1283
    if not callable(user.getUser):
1284
        return {}
1285
    out = {}
1286
    plone_user = user.getUser()
1287
    for sheet in plone_user.listPropertysheets():
1288
        ps = plone_user.getPropertysheet(sheet)
1289
        out.update(dict(ps.propertyItems()))
1290
    return out
1291
1292
1293
def get_users_by_roles(roles=None):
1294
    """Search Plone users by their roles
1295
1296
    :param roles: Plone role name or list of roles
1297
    :type roles:  list/str
1298
    :returns: List of Plone users having the role(s)
1299
    """
1300
    if not isinstance(roles, (tuple, list)):
1301
        roles = [roles]
1302
    mtool = get_tool("portal_membership")
1303
    return mtool.searchForMembers(roles=roles)
1304
1305
1306
def get_current_user():
1307
    """Returns the current logged in user
1308
1309
    :returns: Current User
1310
    """
1311
    return ploneapi.user.get_current()
1312
1313
1314
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1315
    """Returns the associated contact of a Plone user
1316
1317
    If the user passed in has no contact associated, return None.
1318
    The `contact_types` parameter filter the portal types for the search.
1319
1320
    :param: Plone user
1321
    :contact_types: List with the contact portal types to search
1322
    :returns: Contact associated to the Plone user or None
1323
    """
1324
    if not user:
1325
        return None
1326
1327
    query = {'portal_type': contact_types, 'getUsername': user.id}
1328
    brains = search(query, catalog='portal_catalog')
1329
    if not brains:
1330
        return None
1331
1332
    if len(brains) > 1:
1333
        # Oops, the user has multiple contacts assigned, return None
1334
        contacts = map(lambda c: c.Title, brains)
1335
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1336
        err_msg = err_msg.format(user.id, ','.join(contacts))
1337
        logger.error(err_msg)
1338
        return None
1339
1340
    return get_object(brains[0])
1341
1342
1343
def get_user_client(user_or_contact):
1344
    """Returns the client of the contact of a Plone user
1345
1346
    If the user passed in has no contact or does not belong to any client,
1347
    returns None.
1348
1349
    :param: Plone user or contact
1350
    :returns: Client the contact of the Plone user belongs to
1351
    """
1352
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1353
        # Lab contacts cannot belong to a client
1354
        return None
1355
1356
    if not IContact.providedBy(user_or_contact):
1357
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1358
        if IContact.providedBy(contact):
1359
            return get_user_client(contact)
1360
        return None
1361
1362
    client = get_parent(user_or_contact)
1363
    if client and IClient.providedBy(client):
1364
        return client
1365
1366
    return None
1367
1368
1369
def get_current_client():
1370
    """Returns the current client the current logged in user belongs to, if any
1371
1372
    :returns: Client the current logged in user belongs to or None
1373
    """
1374
    return get_user_client(get_current_user())
1375
1376
1377
def get_cache_key(brain_or_object):
1378
    """Generate a cache key for a common brain or object
1379
1380
    :param brain_or_object: A single catalog brain or content object
1381
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1382
    :returns: Cache Key
1383
    :rtype: str
1384
    """
1385
    key = [
1386
        get_portal_type(brain_or_object),
1387
        get_id(brain_or_object),
1388
        get_uid(brain_or_object),
1389
        # handle different domains gracefully
1390
        get_url(brain_or_object),
1391
        # Return the microsecond since the epoch in GMT
1392
        get_modification_date(brain_or_object).micros(),
1393
    ]
1394
    return "-".join(map(lambda x: str(x), key))
1395
1396
1397
def bika_cache_key_decorator(method, self, brain_or_object):
1398
    """Bika cache key decorator usable for
1399
1400
    :param brain_or_object: A single catalog brain or content object
1401
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1402
    :returns: Cache Key
1403
    :rtype: str
1404
    """
1405
    if brain_or_object is None:
1406
        raise DontCache
1407
    return get_cache_key(brain_or_object)
1408
1409
1410
def normalize_id(string):
1411
    """Normalize the id
1412
1413
    :param string: A string to normalize
1414
    :type string: str
1415
    :returns: Normalized ID
1416
    :rtype: str
1417
    """
1418
    if not isinstance(string, six.string_types):
1419
        fail("Type of argument must be string, found '{}'"
1420
             .format(type(string)))
1421
    # get the id nomalizer utility
1422
    normalizer = getUtility(IIDNormalizer).normalize
1423
    return normalizer(string)
1424
1425
1426
def normalize_filename(string):
1427
    """Normalize the filename
1428
1429
    :param string: A string to normalize
1430
    :type string: str
1431
    :returns: Normalized ID
1432
    :rtype: str
1433
    """
1434
    if not isinstance(string, six.string_types):
1435
        fail("Type of argument must be string, found '{}'"
1436
             .format(type(string)))
1437
    # get the file nomalizer utility
1438
    normalizer = getUtility(IFileNameNormalizer).normalize
1439
    return normalizer(string)
1440
1441
1442
def is_uid(uid, validate=False):
1443
    """Checks if the passed in uid is a valid UID
1444
1445
    :param uid: The uid to check
1446
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1447
    True, also verifies if a brain exists for the uid passed in
1448
    :type uid: string
1449
    :return: True if a valid uid
1450
    :rtype: bool
1451
    """
1452
    if not isinstance(uid, six.string_types):
1453
        return False
1454
    if uid == '0':
1455
        return True
1456
    if len(uid) != 32:
1457
        return False
1458
    if not UID_RX.match(uid):
1459
        return False
1460
    if not validate:
1461
        return True
1462
1463
    # Check if a brain for this uid exists
1464
    uc = get_tool('uid_catalog')
1465
    brains = uc(UID=uid)
1466
    if brains:
1467
        assert (len(brains) == 1)
1468
    return len(brains) > 0
1469
1470
1471
def is_date(date):
1472
    """Checks if the passed in value is a valid Zope's DateTime
1473
1474
    :param date: The date to check
1475
    :type date: DateTime
1476
    :return: True if a valid date
1477
    :rtype: bool
1478
    """
1479
    if not date:
1480
        return False
1481
    return isinstance(date, (DateTime, datetime))
1482
1483
1484
def to_date(value, default=None):
1485
    """Tries to convert the passed in value to Zope's DateTime
1486
1487
    :param value: The value to be converted to a valid DateTime
1488
    :type value: str, DateTime or datetime
1489
    :return: The DateTime representation of the value passed in or default
1490
    """
1491
    if isinstance(value, DateTime):
1492
        return value
1493
    if not value:
1494
        if default is None:
1495
            return None
1496
        return to_date(default)
1497
    try:
1498
        if isinstance(value, str) and '.' in value:
1499
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1500
            return DateTime(value, datefmt='international')
1501
        return DateTime(value)
1502
    except (TypeError, ValueError, DateTimeError):
1503
        return to_date(default)
1504
1505
1506
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1507
               round_to_int=True):
1508
    """Returns the computed total number of minutes
1509
    """
1510
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1511
        float(seconds)/60 + float(milliseconds)/1000/60
1512
    return int(round(total)) if round_to_int else total
1513
1514
1515
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1516
    """Returns a representation of time in a string in xd yh zm format
1517
    """
1518
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1519
                         seconds=seconds, milliseconds=milliseconds)
1520
    delta = timedelta(minutes=int(round(minutes)))
1521
    d = delta.days
1522
    h = delta.seconds // 3600
1523
    m = (delta.seconds // 60) % 60
1524
    m = m and "{}m ".format(str(m)) or ""
1525
    d = d and "{}d ".format(str(d)) or ""
1526
    if m and d:
1527
        h = "{}h ".format(str(h))
1528
    else:
1529
        h = h and "{}h ".format(str(h)) or ""
1530
    return "".join([d, h, m]).strip()
1531
1532
1533
def to_int(value, default=_marker):
1534
    """Tries to convert the value to int.
1535
    Truncates at the decimal point if the value is a float
1536
1537
    :param value: The value to be converted to an int
1538
    :return: The resulting int or default
1539
    """
1540
    if is_floatable(value):
1541
        value = to_float(value)
1542
    try:
1543
        return int(value)
1544
    except (TypeError, ValueError):
1545
        if default is None:
1546
            return default
1547
        if default is not _marker:
1548
            return to_int(default)
1549
        fail("Value %s cannot be converted to int" % repr(value))
1550
1551
1552
def is_floatable(value):
1553
    """Checks if the passed in value is a valid floatable number
1554
1555
    :param value: The value to be evaluated as a float number
1556
    :type value: str, float, int
1557
    :returns: True if is a valid float number
1558
    :rtype: bool"""
1559
    try:
1560
        float(value)
1561
        return True
1562
    except (TypeError, ValueError):
1563
        return False
1564
1565
1566
def to_float(value, default=_marker):
1567
    """Converts the passed in value to a float number
1568
1569
    :param value: The value to be converted to a floatable number
1570
    :type value: str, float, int
1571
    :returns: The float number representation of the passed in value
1572
    :rtype: float
1573
    """
1574
    if not is_floatable(value):
1575
        if default is not _marker:
1576
            return to_float(default)
1577
        fail("Value %s is not floatable" % repr(value))
1578
    return float(value)
1579
1580
1581
def float_to_string(value, default=_marker):
1582
    """Convert a float value to string without exponential notation
1583
1584
    This function preserves the whole fraction
1585
1586
    :param value: The float value to be converted to a string
1587
    :type value: str, float, int
1588
    :returns: String representation of the float w/o exponential notation
1589
    :rtype: str
1590
    """
1591
    if not is_floatable(value):
1592
        if default is not _marker:
1593
            return default
1594
        fail("Value %s is not floatable" % repr(value))
1595
1596
    # Leave floatable string values unchanged
1597
    if isinstance(value, six.string_types):
1598
        return value
1599
1600
    value = float(value)
1601
    str_value = str(value)
1602
1603
    if "." in str_value:
1604
        # might be something like 1.23e-26
1605
        front, back = str_value.split(".")
1606
    else:
1607
        # or 1e-07 for 0.0000001
1608
        back = str_value
1609
1610
    if "e-" in back:
1611
        fraction, zeros = back.split("e-")
1612
        # we want to cover the faction and the zeros
1613
        precision = len(fraction) + int(zeros)
1614
        template = "{:.%df}" % precision
1615
        str_value = template.format(value)
1616
    elif "e+" in back:
1617
        # positive numbers, e.g. 1e+16 don't need a fractional part
1618
        str_value = "{:.0f}".format(value)
1619
1620
    # cut off trailing zeros
1621
    if "." in str_value:
1622
        str_value = str_value.rstrip("0").rstrip(".")
1623
1624
    return str_value
1625
1626
1627
def to_searchable_text_metadata(value):
1628
    """Parse the given metadata value to searchable text
1629
1630
    :param value: The raw value of the metadata column
1631
    :returns: Searchable and translated unicode value or None
1632
    """
1633
    if not value:
1634
        return u""
1635
    if value is Missing.Value:
1636
        return u""
1637
    if is_uid(value):
1638
        return u""
1639
    if isinstance(value, (bool)):
1640
        return u""
1641
    if isinstance(value, (list, tuple)):
1642
        values = map(to_searchable_text_metadata, value)
1643
        values = filter(None, values)
1644
        return " ".join(values)
1645
    if isinstance(value, dict):
1646
        return to_searchable_text_metadata(value.values())
1647
    if is_date(value):
1648
        from senaite.core.api.dtime import date_to_string
1649
        return date_to_string(value, "%Y-%m-%d")
1650
    if is_at_content(value):
1651
        return to_searchable_text_metadata(get_title(value))
1652
    if not isinstance(value, six.string_types):
1653
        value = str(value)
1654
    return safe_unicode(value)
1655
1656
1657
def get_registry_record(name, default=None):
1658
    """Returns the value of a registry record
1659
1660
    :param name: [required] name of the registry record
1661
    :type name: str
1662
    :param default: The value returned if the record is not found
1663
    :type default: anything
1664
    :returns: value of the registry record
1665
    """
1666
    return ploneapi.portal.get_registry_record(name, default=default)
1667
1668
1669
def to_display_list(pairs, sort_by="key", allow_empty=True):
1670
    """Create a Plone DisplayList from list items
1671
1672
    :param pairs: list of key, value pairs
1673
    :param sort_by: Sort the items either by key or value
1674
    :param allow_empty: Allow to select an empty value
1675
    :returns: Plone DisplayList
1676
    """
1677
    dl = DisplayList()
1678
1679
    if isinstance(pairs, six.string_types):
1680
        pairs = [pairs, pairs]
1681
    for pair in pairs:
1682
        # pairs is a list of lists -> add each pair
1683
        if isinstance(pair, (tuple, list)):
1684
            dl.add(*pair)
1685
        # pairs is just a single pair -> add it and stop
1686
        if isinstance(pair, six.string_types):
1687
            dl.add(*pairs)
1688
            break
1689
1690
    # add the empty option
1691
    if allow_empty:
1692
        dl.add("", "")
1693
1694
    # sort by key/value
1695
    if sort_by == "key":
1696
        dl = dl.sortedByKey()
1697
    elif sort_by == "value":
1698
        dl = dl.sortedByValue()
1699
1700
    return dl
1701
1702
1703
def text_to_html(text, wrap="p", encoding="utf8"):
1704
    """Convert `\n` sequences in the text to HTML `\n`
1705
1706
    :param text: Plain text to convert
1707
    :param wrap: Toggle to wrap the text in a
1708
    :returns: HTML converted and encoded text
1709
    """
1710
    if not text:
1711
        return ""
1712
    # handle text internally as unicode
1713
    text = safe_unicode(text)
1714
    # replace newline characters with HTML entities
1715
    html = text.replace("\n", "<br/>")
1716
    if wrap:
1717
        html = u"<{tag}>{html}</{tag}>".format(
1718
            tag=wrap, html=html)
1719
    # return encoded html
1720
    return html.encode(encoding)
1721
1722
1723
def to_utf8(string, default=_marker):
1724
    """Encode string to UTF8
1725
1726
    :param string: String to be encoded to UTF8
1727
    :returns: UTF8 encoded string
1728
    """
1729
    if not isinstance(string, six.string_types):
1730
        if default is _marker:
1731
            fail("Expected string type, got '%s'" % type(string))
1732
        return default
1733
    return safe_unicode(string).encode("utf8")
1734
1735
1736
def is_temporary(obj):
1737
    """Returns whether the given object is temporary or not
1738
1739
    :param obj: the object to evaluate
1740
    :returns: True if the object is temporary
1741
    """
1742
    obj_id = getattr(aq_base(obj), "id", None)
1743
    if obj_id is None or UID_RX.match(obj_id):
1744
        return True
1745
1746
    parent = aq_parent(aq_inner(obj))
1747
    if not parent:
1748
        return True
1749
1750
    parent_id = getattr(aq_base(parent), "id", None)
1751
    if parent_id is None or UID_RX.match(parent_id):
1752
        return True
1753
1754
    if is_at_content(obj):
1755
        # Checks to see if we are created inside the portal_factory. We don't
1756
        # rely here on AT's isFactoryContained because the function is patched
1757
        meta_type = getattr(aq_base(parent), "meta_type", "")
1758
        return meta_type == "TempFolder"
1759
1760
    return False
1761
1762
1763
def is_string(thing):
1764
    """Checks if the passed in object is a string type
1765
1766
    :param thing: object to test
1767
    :returns: True if the object is a string
1768
    """
1769
    return isinstance(thing, six.string_types)
1770