Passed
Push — 2.x ( bb5fa7...513331 )
by Ramon
05:52
created

bika.lims.api.edit()   B

Complexity

Conditions 8

Size

Total Lines 28
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 28
rs 7.3333
c 0
b 0
f 0
cc 8
nop 3
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import re
22
from collections import OrderedDict
23
from datetime import datetime
24
from datetime import timedelta
25
from itertools import groupby
26
27
import six
28
29
import Missing
30
from AccessControl.PermissionRole import rolesForPermissionOn
31
from Acquisition import aq_base
32
from Acquisition import aq_inner
33
from Acquisition import aq_parent
34
from bika.lims import logger
35
from bika.lims.interfaces import IClient
36
from bika.lims.interfaces import IContact
37
from bika.lims.interfaces import ILabContact
38
from DateTime import DateTime
39
from DateTime.interfaces import DateTimeError
40
from plone import api as ploneapi
41
from plone.api.exc import InvalidParameterError
42
from plone.app.layout.viewlets.content import ContentHistoryView
43
from plone.behavior.interfaces import IBehaviorAssignable
44
from plone.dexterity.interfaces import IDexterityContent
45
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
46
from plone.i18n.normalizer.interfaces import IIDNormalizer
47
from plone.memoize.volatile import DontCache
48
from Products.Archetypes.atapi import DisplayList
49
from Products.Archetypes.BaseObject import BaseObject
50
from Products.Archetypes.event import ObjectInitializedEvent
51
from Products.Archetypes.utils import mapply
52
from Products.CMFCore.interfaces import IFolderish
53
from Products.CMFCore.interfaces import ISiteRoot
54
from Products.CMFCore.permissions import ModifyPortalContent
55
from Products.CMFCore.utils import getToolByName
56
from Products.CMFCore.WorkflowCore import WorkflowException
57
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
58
from Products.CMFPlone.utils import _createObjectByType
59
from Products.CMFPlone.utils import base_hasattr
60
from Products.CMFPlone.utils import safe_unicode
61
from Products.PlonePAS.tools.memberdata import MemberData
62
from Products.ZCatalog.interfaces import ICatalogBrain
63
from zope import globalrequest
64
from zope.annotation.interfaces import IAttributeAnnotatable
65
from zope.component import getUtility
66
from zope.component import queryMultiAdapter
67
from zope.component.interfaces import IFactory
68
from zope.event import notify
69
from zope.interface import directlyProvides
70
from zope.lifecycleevent import ObjectCreatedEvent
71
from zope.publisher.browser import TestRequest
72
from zope.schema import getFieldsInOrder
73
from zope.security.interfaces import Unauthorized
74
75
"""SENAITE LIMS Framework API
76
77
Please see bika.lims/docs/API.rst for documentation.
78
79
Architecural Notes:
80
81
Please add only functions that do a single thing for a single object.
82
83
Good: `def get_foo(brain_or_object)`
84
Bad:  `def get_foos(list_of_brain_objects)`
85
86
Why?
87
88
Because it makes things more complex. You can always use a pattern like this to
89
achieve the same::
90
91
    >>> foos = map(get_foo, list_of_brain_objects)
92
93
Please add for all of your functions a descriptive test in docs/API.rst.
94
95
Thanks.
96
"""
97
98
_marker = object()
99
100
UID_RX = re.compile("[a-z0-9]{32}$")
101
102
103
class APIError(Exception):
104
    """Base exception class for bika.lims errors."""
105
106
107
def get_portal():
108
    """Get the portal object
109
110
    :returns: Portal object
111
    """
112
    return ploneapi.portal.getSite()
113
114
115
def get_setup():
116
    """Fetch the `bika_setup` folder.
117
    """
118
    portal = get_portal()
119
    return portal.get("bika_setup")
120
121
122
def get_bika_setup():
123
    """Fetch the `bika_setup` folder.
124
    """
125
    return get_setup()
126
127
128
def get_senaite_setup():
129
    """Fetch the new DX `setup` folder.
130
    """
131
    portal = get_portal()
132
    return portal.get("setup")
133
134
135
def create(container, portal_type, *args, **kwargs):
136
    """Creates an object in Bika LIMS
137
138
    This code uses most of the parts from the TypesTool
139
    see: `Products.CMFCore.TypesTool._constructInstance`
140
141
    :param container: container
142
    :type container: ATContentType/DexterityContentType/CatalogBrain
143
    :param portal_type: The portal type to create, e.g. "Client"
144
    :type portal_type: string
145
    :param title: The title for the new content object
146
    :type title: string
147
    :returns: The new created object
148
    """
149
    from bika.lims.utils import tmpID
150
151
    id = kwargs.pop("id", tmpID())
152
    title = kwargs.pop("title", "New {}".format(portal_type))
153
154
    # get the fti
155
    types_tool = get_tool("portal_types")
156
    fti = types_tool.getTypeInfo(portal_type)
157
158
    if fti.product:
159
        # create the AT object
160
        obj = _createObjectByType(portal_type, container, id)
161
        # update the object with values
162
        edit(obj, check_permissions=False, title=title, **kwargs)
163
        # auto-id if required
164
        if obj._at_rename_after_creation:
165
            obj._renameAfterCreation(check_auto_id=True)
166
        # we are no longer under creation
167
        obj.unmarkCreationFlag()
168
        # notify that the object was created
169
        notify(ObjectInitializedEvent(obj))
170
    else:
171
        # newstyle factory
172
        factory = getUtility(IFactory, fti.factory)
173
        obj = factory(id, *args, **kwargs)
174
        if hasattr(obj, '_setPortalTypeName'):
175
            obj._setPortalTypeName(fti.getId())
176
        # set the title
177
        obj.title = safe_unicode(title)
178
        # notify that the object was created
179
        notify(ObjectCreatedEvent(obj))
180
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and
181
        # ContainerModifiedEvent
182
        container._setObject(id, obj)
183
        # we get the object here with the current object id, as it might be
184
        # renamed already by an event handler
185
        obj = container._getOb(obj.getId())
186
187
    return obj
188
189
190
def edit(obj, check_permissions=True, **kwargs):
191
    """Updates the values of object fields with the new values passed-in
192
    """
193
    # Prevent circular dependencies
194
    from security import check_permission
195
    fields = get_fields(obj)
196
    for name, value in kwargs.items():
197
        field = fields.get(name, None)
198
        if not field:
199
            continue
200
201
        # cannot update readonly fields
202
        readonly = getattr(field, "readonly", False)
203
        if readonly:
204
            raise ValueError("Field '{}' is readonly".format(name))
205
206
        # check field writable permission
207
        if check_permissions:
208
            perm = getattr(field, "write_permission", ModifyPortalContent)
209
            if perm and not check_permission(perm, obj):
210
                raise Unauthorized("Field '{}' is not writeable".format(name))
211
212
        # Set the value
213
        if hasattr(field, "getMutator"):
214
            mutator = field.getMutator(obj)
215
            mapply(mutator, value)
216
        else:
217
            field.set(obj, value)
218
219
220
def get_tool(name, context=None, default=_marker):
221
    """Get a portal tool by name
222
223
    :param name: The name of the tool, e.g. `portal_catalog`
224
    :type name: string
225
    :param context: A portal object
226
    :type context: ATContentType/DexterityContentType/CatalogBrain
227
    :returns: Portal Tool
228
    """
229
230
    # Try first with the context
231
    if context is not None:
232
        try:
233
            context = get_object(context)
234
            return getToolByName(context, name)
235
        except (APIError, AttributeError) as e:
236
            # https://github.com/senaite/bika.lims/issues/396
237
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
238
                        "-> falling back to plone.api.portal.get_tool('{}')"
239
                        .format(repr(context), name, repr(e), name))
240
            return get_tool(name, default=default)
241
242
    # Try with the plone api
243
    try:
244
        return ploneapi.portal.get_tool(name)
245
    except InvalidParameterError:
246
        if default is not _marker:
247
            if isinstance(default, six.string_types):
248
                return get_tool(default)
249
            return default
250
        fail("No tool named '%s' found." % name)
251
252
253
def fail(msg=None):
254
    """API LIMS Error
255
    """
256
    if msg is None:
257
        msg = "Reason not given."
258
    raise APIError("{}".format(msg))
259
260
261
def is_object(brain_or_object):
262
    """Check if the passed in object is a supported portal content object
263
264
    :param brain_or_object: A single catalog brain or content object
265
    :type brain_or_object: Portal Object
266
    :returns: True if the passed in object is a valid portal content
267
    """
268
    if is_portal(brain_or_object):
269
        return True
270
    if is_supermodel(brain_or_object):
271
        return True
272
    if is_at_content(brain_or_object):
273
        return True
274
    if is_dexterity_content(brain_or_object):
275
        return True
276
    if is_brain(brain_or_object):
277
        return True
278
    return False
279
280
281
def get_object(brain_object_uid, default=_marker):
282
    """Get the full content object
283
284
    :param brain_object_uid: A catalog brain or content object or uid
285
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
286
    /CatalogBrain/basestring
287
    :returns: The full object
288
    """
289
    if is_uid(brain_object_uid):
290
        return get_object_by_uid(brain_object_uid)
291
    elif is_supermodel(brain_object_uid):
292
        return brain_object_uid.instance
293
    if not is_object(brain_object_uid):
294
        if default is _marker:
295
            fail("{} is not supported.".format(repr(brain_object_uid)))
296
        return default
297
    if is_brain(brain_object_uid):
298
        return brain_object_uid.getObject()
299
    return brain_object_uid
300
301
302
def is_portal(brain_or_object):
303
    """Checks if the passed in object is the portal root object
304
305
    :param brain_or_object: A single catalog brain or content object
306
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
307
    :returns: True if the object is the portal root object
308
    :rtype: bool
309
    """
310
    return ISiteRoot.providedBy(brain_or_object)
311
312
313
def is_brain(brain_or_object):
314
    """Checks if the passed in object is a portal catalog brain
315
316
    :param brain_or_object: A single catalog brain or content object
317
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
318
    :returns: True if the object is a catalog brain
319
    :rtype: bool
320
    """
321
    return ICatalogBrain.providedBy(brain_or_object)
322
323
324
def is_supermodel(brain_or_object):
325
    """Checks if the passed in object is a supermodel
326
327
    :param brain_or_object: A single catalog brain or content object
328
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
329
    :returns: True if the object is a catalog brain
330
    :rtype: bool
331
    """
332
    # avoid circular imports
333
    from senaite.app.supermodel.interfaces import ISuperModel
334
    return ISuperModel.providedBy(brain_or_object)
335
336
337
def is_dexterity_content(brain_or_object):
338
    """Checks if the passed in object is a dexterity content type
339
340
    :param brain_or_object: A single catalog brain or content object
341
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
342
    :returns: True if the object is a dexterity content type
343
    :rtype: bool
344
    """
345
    return IDexterityContent.providedBy(brain_or_object)
346
347
348
def is_at_content(brain_or_object):
349
    """Checks if the passed in object is an AT content type
350
351
    :param brain_or_object: A single catalog brain or content object
352
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
353
    :returns: True if the object is an AT content type
354
    :rtype: bool
355
    """
356
    return isinstance(brain_or_object, BaseObject)
357
358
359
def is_folderish(brain_or_object):
360
    """Checks if the passed in object is folderish
361
362
    :param brain_or_object: A single catalog brain or content object
363
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
364
    :returns: True if the object is folderish
365
    :rtype: bool
366
    """
367
    if hasattr(brain_or_object, "is_folderish"):
368
        if callable(brain_or_object.is_folderish):
369
            return brain_or_object.is_folderish()
370
        return brain_or_object.is_folderish
371
    return IFolderish.providedBy(get_object(brain_or_object))
372
373
374
def get_portal_type(brain_or_object):
375
    """Get the portal type for this object
376
377
    :param brain_or_object: A single catalog brain or content object
378
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
379
    :returns: Portal type
380
    :rtype: string
381
    """
382
    if not is_object(brain_or_object):
383
        fail("{} is not supported.".format(repr(brain_or_object)))
384
    return brain_or_object.portal_type
385
386
387
def get_schema(brain_or_object):
388
    """Get the schema of the content
389
390
    :param brain_or_object: A single catalog brain or content object
391
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
392
    :returns: Schema object
393
    """
394
    obj = get_object(brain_or_object)
395
    if is_portal(obj):
396
        fail("get_schema can't return schema of portal root")
397
    if is_dexterity_content(obj):
398
        pt = get_tool("portal_types")
399
        fti = pt.getTypeInfo(obj.portal_type)
400
        return fti.lookupSchema()
401
    if is_at_content(obj):
402
        return obj.Schema()
403
    fail("{} has no Schema.".format(brain_or_object))
404
405
406
def get_fields(brain_or_object):
407
    """Get a name to field mapping of the object
408
409
    :param brain_or_object: A single catalog brain or content object
410
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
411
    :returns: Mapping of name -> field
412
    :rtype: OrderedDict
413
    """
414
    obj = get_object(brain_or_object)
415
    schema = get_schema(obj)
416
    if is_dexterity_content(obj):
417
        # get the fields directly provided by the interface
418
        fields = getFieldsInOrder(schema)
419
        # append the fields coming from behaviors
420
        behavior_assignable = IBehaviorAssignable(obj)
421
        if behavior_assignable:
422
            behaviors = behavior_assignable.enumerateBehaviors()
423
            for behavior in behaviors:
424
                fields.extend(getFieldsInOrder(behavior.interface))
425
        return OrderedDict(fields)
426
    return OrderedDict(schema)
427
428
429
def get_id(brain_or_object):
430
    """Get the Plone ID for this object
431
432
    :param brain_or_object: A single catalog brain or content object
433
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
434
    :returns: Plone ID
435
    :rtype: string
436
    """
437
    if is_brain(brain_or_object):
438
        if base_hasattr(brain_or_object, "getId"):
439
            return brain_or_object.getId
440
        if base_hasattr(brain_or_object, "id"):
441
            return brain_or_object.id
442
    return get_object(brain_or_object).getId()
443
444
445
def get_title(brain_or_object):
446
    """Get the Title for this object
447
448
    :param brain_or_object: A single catalog brain or content object
449
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
450
    :returns: Title
451
    :rtype: string
452
    """
453
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
454
        return brain_or_object.Title
455
    return get_object(brain_or_object).Title()
456
457
458
def get_description(brain_or_object):
459
    """Get the Title for this object
460
461
    :param brain_or_object: A single catalog brain or content object
462
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
463
    :returns: Title
464
    :rtype: string
465
    """
466
    if is_brain(brain_or_object) \
467
            and base_hasattr(brain_or_object, "Description"):
468
        return brain_or_object.Description
469
    return get_object(brain_or_object).Description()
470
471
472
def get_uid(brain_or_object):
473
    """Get the Plone UID for this object
474
475
    :param brain_or_object: A single catalog brain or content object or an UID
476
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
477
    :returns: Plone UID
478
    :rtype: string
479
    """
480
    if is_uid(brain_or_object):
481
        return brain_or_object
482
    if is_portal(brain_or_object):
483
        return '0'
484
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
485
        return brain_or_object.UID
486
    return get_object(brain_or_object).UID()
487
488
489
def get_url(brain_or_object):
490
    """Get the absolute URL for this object
491
492
    :param brain_or_object: A single catalog brain or content object
493
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
494
    :returns: Absolute URL
495
    :rtype: string
496
    """
497
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
498
        return brain_or_object.getURL()
499
    return get_object(brain_or_object).absolute_url()
500
501
502
def get_icon(brain_or_object, html_tag=True):
503
    """Get the icon of the content object
504
505
    :param brain_or_object: A single catalog brain or content object
506
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
507
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
508
    :type html_tag: bool
509
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
510
    :rtype: string
511
    """
512
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
513
    # work for Contents coming from other catalogs than the
514
    # `portal_catalog`
515
    portal_types = get_tool("portal_types")
516
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
517
    icon = fti.getIcon()
518
    if not icon:
519
        return ""
520
    url = "%s/%s" % (get_url(get_portal()), icon)
521
    if not html_tag:
522
        return url
523
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
524
        url=url, title=get_title(brain_or_object))
525
    return tag
526
527
528
def get_object_by_uid(uid, default=_marker):
529
    """Find an object by a given UID
530
531
    :param uid: The UID of the object to find
532
    :type uid: string
533
    :returns: Found Object or None
534
    """
535
536
    # nothing to do here
537
    if not uid:
538
        if default is not _marker:
539
            return default
540
        fail("get_object_by_uid requires UID as first argument; got {} instead"
541
             .format(uid))
542
543
    # we defined the portal object UID to be '0'::
544
    if uid == '0':
545
        return get_portal()
546
547
    brain = get_brain_by_uid(uid)
548
549
    if brain is None:
550
        if default is not _marker:
551
            return default
552
        fail("No object found for UID {}".format(uid))
553
554
    return get_object(brain)
555
556
557
def get_brain_by_uid(uid, default=None):
558
    """Query a brain by a given UID
559
560
    :param uid: The UID of the object to find
561
    :type uid: string
562
    :returns: ZCatalog brain or None
563
    """
564
    if not is_uid(uid):
565
        return default
566
567
    # we try to find the object with the UID catalog
568
    uc = get_tool("uid_catalog")
569
570
    # try to find the object with the reference catalog first
571
    brains = uc(UID=uid)
572
    if len(brains) != 1:
573
        return default
574
    return brains[0]
575
576
577
def get_object_by_path(path, default=_marker):
578
    """Find an object by a given physical path or absolute_url
579
580
    :param path: The physical path of the object to find
581
    :type path: string
582
    :returns: Found Object or None
583
    """
584
585
    # nothing to do here
586
    if not path:
587
        if default is not _marker:
588
            return default
589
        fail("get_object_by_path first argument must be a path; {} received"
590
             .format(path))
591
592
    portal = get_portal()
593
    portal_path = get_path(portal)
594
    portal_url = get_url(portal)
595
596
    # ensure we have a physical path
597
    if path.startswith(portal_url):
598
        request = get_request()
599
        path = "/".join(request.physicalPathFromURL(path))
600
601
    if not path.startswith(portal_path):
602
        if default is not _marker:
603
            return default
604
        fail("Not a physical path inside the portal.")
605
606
    if path == portal_path:
607
        return portal
608
609
    return portal.restrictedTraverse(path, default)
610
611
612
def get_path(brain_or_object):
613
    """Calculate the physical path of this object
614
615
    :param brain_or_object: A single catalog brain or content object
616
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
617
    :returns: Physical path of the object
618
    :rtype: string
619
    """
620
    if is_brain(brain_or_object):
621
        return brain_or_object.getPath()
622
    return "/".join(get_object(brain_or_object).getPhysicalPath())
623
624
625
def get_parent_path(brain_or_object):
626
    """Calculate the physical parent path of this object
627
628
    :param brain_or_object: A single catalog brain or content object
629
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
630
    :returns: Physical path of the parent object
631
    :rtype: string
632
    """
633
    if is_portal(brain_or_object):
634
        return get_path(get_portal())
635
    if is_brain(brain_or_object):
636
        path = get_path(brain_or_object)
637
        return path.rpartition("/")[0]
638
    return get_path(get_object(brain_or_object).aq_parent)
639
640
641
def get_parent(brain_or_object, catalog_search=False):
642
    """Locate the parent object of the content/catalog brain
643
644
    The `catalog_search` switch uses the `portal_catalog` to do a search return
645
    a brain instead of the full parent object. However, if the search returned
646
    no results, it falls back to return the full parent object.
647
648
    :param brain_or_object: A single catalog brain or content object
649
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
650
    :param catalog_search: Use a catalog query to find the parent object
651
    :type catalog_search: bool
652
    :returns: parent object
653
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
654
    """
655
656
    if is_portal(brain_or_object):
657
        return get_portal()
658
659
    # Do a catalog search and return the brain
660
    if catalog_search:
661
        parent_path = get_parent_path(brain_or_object)
662
663
        # parent is the portal object
664
        if parent_path == get_path(get_portal()):
665
            return get_portal()
666
667
        # get the catalog tool
668
        pc = get_portal_catalog()
669
670
        # query for the parent path
671
        results = pc(path={
672
            "query": parent_path,
673
            "depth": 0})
674
675
        # No results fallback: return the parent object
676
        if not results:
677
            return get_object(brain_or_object).aq_parent
678
679
        # return the brain
680
        return results[0]
681
682
    return get_object(brain_or_object).aq_parent
683
684
685
def search(query, catalog=_marker):
686
    """Search for objects.
687
688
    :param query: A suitable search query.
689
    :type query: dict
690
    :param catalog: A single catalog id or a list of catalog ids
691
    :type catalog: str/list
692
    :returns: Search results
693
    :rtype: List of ZCatalog brains
694
    """
695
696
    # query needs to be a dictionary
697
    if not isinstance(query, dict):
698
        fail("Catalog query needs to be a dictionary")
699
700
    # Portal types to query
701
    portal_types = query.get("portal_type", [])
702
    # We want the portal_type as a list
703
    if not isinstance(portal_types, (tuple, list)):
704
        portal_types = [portal_types]
705
706
    # The catalogs used for the query
707
    catalogs = []
708
709
    # The user did **not** specify a catalog
710
    if catalog is _marker:
711
        # Find the registered catalogs for the queried portal types
712
        for portal_type in portal_types:
713
            # Just get the first registered/default catalog
714
            catalogs.append(get_catalogs_for(
715
                portal_type, default="portal_catalog")[0])
716
    else:
717
        # User defined catalogs
718
        if isinstance(catalog, (list, tuple)):
719
            catalogs.extend(map(get_tool, catalog))
720
        else:
721
            catalogs.append(get_tool(catalog))
722
723
    # Cleanup: Avoid duplicate catalogs
724
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
725
726
    # We only support **single** catalog queries
727
    if len(catalogs) > 1:
728
        fail("Multi Catalog Queries are not supported!")
729
730
    return catalogs[0](query)
731
732
733
def safe_getattr(brain_or_object, attr, default=_marker):
734
    """Return the attribute value
735
736
    :param brain_or_object: A single catalog brain or content object
737
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
738
    :param attr: Attribute name
739
    :type attr: str
740
    :returns: Attribute value
741
    :rtype: obj
742
    """
743
    try:
744
        value = getattr(brain_or_object, attr, _marker)
745
        if value is _marker:
746
            if default is not _marker:
747
                return default
748
            fail("Attribute '{}' not found.".format(attr))
749
        if callable(value):
750
            return value()
751
        return value
752
    except Unauthorized:
753
        if default is not _marker:
754
            return default
755
        fail("You are not authorized to access '{}' of '{}'.".format(
756
            attr, repr(brain_or_object)))
757
758
759
def get_portal_catalog():
760
    """Get the portal catalog tool
761
762
    :returns: Portal Catalog Tool
763
    """
764
    return get_tool("portal_catalog")
765
766
767
def get_review_history(brain_or_object, rev=True):
768
    """Get the review history for the given brain or context.
769
770
    :param brain_or_object: A single catalog brain or content object
771
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
772
    :returns: Workflow history
773
    :rtype: [{}, ...]
774
    """
775
    obj = get_object(brain_or_object)
776
    review_history = []
777
    try:
778
        workflow = get_tool("portal_workflow")
779
        review_history = workflow.getInfoFor(obj, 'review_history')
780
    except WorkflowException as e:
781
        message = str(e)
782
        logger.error("Cannot retrieve review_history on {}: {}".format(
783
            obj, message))
784
    if not isinstance(review_history, (list, tuple)):
785
        logger.error("get_review_history: expected list, received {}".format(
786
            review_history))
787
        review_history = []
788
789
    if isinstance(review_history, tuple):
790
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
791
        # tuple when "review_history" is passed in:
792
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
793
        #
794
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
795
        # Expression.StateChangeInfo, that always returns a list, except when no
796
        # review_history is found:
797
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
798
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
799
        review_history = list(review_history)
800
801
    if rev is True:
802
        review_history.reverse()
803
    return review_history
804
805
806
def get_revision_history(brain_or_object):
807
    """Get the revision history for the given brain or context.
808
809
    :param brain_or_object: A single catalog brain or content object
810
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
811
    :returns: Workflow history
812
    :rtype: obj
813
    """
814
    obj = get_object(brain_or_object)
815
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
816
    return chv.fullHistory()
817
818
819
def get_workflows_for(brain_or_object):
820
    """Get the assigned workflows for the given brain or context.
821
822
    Note: This function supports also the portal_type as parameter.
823
824
    :param brain_or_object: A single catalog brain or content object
825
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
826
    :returns: Assigned Workflows
827
    :rtype: tuple
828
    """
829
    workflow = ploneapi.portal.get_tool("portal_workflow")
830
    if isinstance(brain_or_object, six.string_types):
831
        return workflow.getChainFor(brain_or_object)
832
    obj = get_object(brain_or_object)
833
    return workflow.getChainFor(obj)
834
835
836
def get_workflow_status_of(brain_or_object, state_var="review_state"):
837
    """Get the current workflow status of the given brain or context.
838
839
    :param brain_or_object: A single catalog brain or content object
840
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
841
    :param state_var: The name of the state variable
842
    :type state_var: string
843
    :returns: Status
844
    :rtype: str
845
    """
846
    # Try to get the state from the catalog brain first
847
    if is_brain(brain_or_object):
848
        if state_var in brain_or_object.schema():
849
            return brain_or_object[state_var]
850
851
    # Retrieve the sate from the object
852
    workflow = get_tool("portal_workflow")
853
    obj = get_object(brain_or_object)
854
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
855
856
857
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
858
    """Get the previous workflow status of the object
859
860
    :param brain_or_object: A single catalog brain or content object
861
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
862
    :param skip: Workflow states to skip
863
    :type skip: tuple/list
864
    :returns: status
865
    :rtype: str
866
    """
867
868
    skip = isinstance(skip, (list, tuple)) and skip or []
869
    history = get_review_history(brain_or_object)
870
871
    # Remove consecutive duplicates, some transitions might happen more than
872
    # once consecutively (e.g. publish)
873
    history = map(lambda i: i[0], groupby(history))
874
875
    for num, item in enumerate(history):
876
        # skip the current history entry
877
        if num == 0:
878
            continue
879
        status = item.get("review_state")
880
        if status in skip:
881
            continue
882
        return status
883
    return default
884
885
886
def get_creation_date(brain_or_object):
887
    """Get the creation date of the brain or object
888
889
    :param brain_or_object: A single catalog brain or content object
890
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
891
    :returns: Creation date
892
    :rtype: DateTime
893
    """
894
    created = getattr(brain_or_object, "created", None)
895
    if created is None:
896
        fail("Object {} has no creation date ".format(
897
             repr(brain_or_object)))
898
    if callable(created):
899
        return created()
900
    return created
901
902
903
def get_modification_date(brain_or_object):
904
    """Get the modification date of the brain or object
905
906
    :param brain_or_object: A single catalog brain or content object
907
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
908
    :returns: Modification date
909
    :rtype: DateTime
910
    """
911
    modified = getattr(brain_or_object, "modified", None)
912
    if modified is None:
913
        fail("Object {} has no modification date ".format(
914
             repr(brain_or_object)))
915
    if callable(modified):
916
        return modified()
917
    return modified
918
919
920
def get_review_status(brain_or_object):
921
    """Get the `review_state` of an object
922
923
    :param brain_or_object: A single catalog brain or content object
924
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
925
    :returns: Value of the review_status variable
926
    :rtype: String
927
    """
928
    if is_brain(brain_or_object):
929
        return brain_or_object.review_state
930
    return get_workflow_status_of(brain_or_object, state_var="review_state")
931
932
933
def is_active(brain_or_object):
934
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
935
936
    :param brain_or_object: A single catalog brain or content object
937
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
938
    :returns: False if the object is in the state 'inactive' or 'cancelled'
939
    :rtype: bool
940
    """
941
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
942
        return False
943
    return True
944
945
946
def get_catalogs_for(brain_or_object, default="portal_catalog"):
947
    """Get all registered catalogs for the given portal_type, catalog brain or
948
    content object
949
950
    :param brain_or_object: The portal_type, a catalog brain or content object
951
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
952
    :returns: List of supported catalogs
953
    :rtype: list
954
    """
955
    archetype_tool = get_tool("archetype_tool", default=None)
956
    if archetype_tool is None:
957
        # return the default catalog
958
        return [get_tool(default, default="portal_catalog")]
959
960
    catalogs = []
961
962
    # get the registered catalogs for portal_type
963
    if is_object(brain_or_object):
964
        catalogs = archetype_tool.getCatalogsByType(
965
            get_portal_type(brain_or_object))
966
    if isinstance(brain_or_object, six.string_types):
967
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
968
969
    if not catalogs:
970
        return [get_tool(default)]
971
    return catalogs
972
973
974
def get_transitions_for(brain_or_object):
975
    """List available workflow transitions for all workflows
976
977
    :param brain_or_object: A single catalog brain or content object
978
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
979
    :returns: All possible available and allowed transitions
980
    :rtype: list[dict]
981
    """
982
    workflow = get_tool('portal_workflow')
983
    transitions = []
984
    instance = get_object(brain_or_object)
985
    for wfid in get_workflows_for(brain_or_object):
986
        wf = workflow[wfid]
987
        tlist = wf.getTransitionsFor(instance)
988
        transitions.extend([t for t in tlist if t not in transitions])
989
    return transitions
990
991
992
def do_transition_for(brain_or_object, transition):
993
    """Performs a workflow transition for the passed in object.
994
995
    :param brain_or_object: A single catalog brain or content object
996
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
997
    :returns: The object where the transtion was performed
998
    """
999
    if not isinstance(transition, six.string_types):
1000
        fail("Transition type needs to be string, got '%s'" % type(transition))
1001
    obj = get_object(brain_or_object)
1002
    try:
1003
        ploneapi.content.transition(obj, transition)
1004
    except ploneapi.exc.InvalidParameterError as e:
1005
        fail("Failed to perform transition '{}' on {}: {}".format(
1006
             transition, obj, str(e)))
1007
    return obj
1008
1009
1010
def get_roles_for_permission(permission, brain_or_object):
1011
    """Get a list of granted roles for the given permission on the object.
1012
1013
    :param brain_or_object: A single catalog brain or content object
1014
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1015
    :returns: Roles for the given Permission
1016
    :rtype: list
1017
    """
1018
    obj = get_object(brain_or_object)
1019
    allowed = set(rolesForPermissionOn(permission, obj))
1020
    return sorted(allowed)
1021
1022
1023
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1024
    """Checks if the passed in object is versionable.
1025
1026
    :param brain_or_object: A single catalog brain or content object
1027
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1028
    :returns: True if the object is versionable
1029
    :rtype: bool
1030
    """
1031
    pr = get_tool("portal_repository")
1032
    obj = get_object(brain_or_object)
1033
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1034
        and pr.isVersionable(obj)
1035
1036
1037
def get_version(brain_or_object):
1038
    """Get the version of the current object
1039
1040
    :param brain_or_object: A single catalog brain or content object
1041
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1042
    :returns: The current version of the object, or None if not available
1043
    :rtype: int or None
1044
    """
1045
    obj = get_object(brain_or_object)
1046
    if not is_versionable(obj):
1047
        return None
1048
    return getattr(aq_base(obj), "version_id", 0)
1049
1050
1051
def get_view(name, context=None, request=None, default=None):
1052
    """Get the view by name
1053
1054
    :param name: The name of the view
1055
    :type name: str
1056
    :param context: The context to query the view
1057
    :type context: ATContentType/DexterityContentType/CatalogBrain
1058
    :param request: The request to query the view
1059
    :type request: HTTPRequest object
1060
    :returns: HTTP Request
1061
    :rtype: Products.Five.metaclass View object
1062
    """
1063
    context = context or get_portal()
1064
    request = request or get_request() or None
1065
    view = queryMultiAdapter((get_object(context), request), name=name)
1066
    if view is None:
1067
        return default
1068
    return view
1069
1070
1071
def get_request():
1072
    """Get the global request object
1073
1074
    :returns: HTTP Request
1075
    :rtype: HTTPRequest object
1076
    """
1077
    return globalrequest.getRequest()
1078
1079
1080
def get_test_request():
1081
    """Get the TestRequest object
1082
    """
1083
    request = TestRequest()
1084
    directlyProvides(request, IAttributeAnnotatable)
1085
    return request
1086
1087
1088
def get_group(group_or_groupname):
1089
    """Return Plone Group
1090
1091
    :param group_or_groupname: Plone group or the name of the group
1092
    :type groupname:  GroupData/str
1093
    :returns: Plone GroupData
1094
    """
1095
    if not group_or_groupname:
1096
1097
        return None
1098
    if hasattr(group_or_groupname, "_getGroup"):
1099
        return group_or_groupname
1100
    gtool = get_tool("portal_groups")
1101
    return gtool.getGroupById(group_or_groupname)
1102
1103
1104
def get_user(user_or_username):
1105
    """Return Plone User
1106
1107
    :param user_or_username: Plone user or user id
1108
    :returns: Plone MemberData
1109
    """
1110
    user = None
1111
    if isinstance(user_or_username, MemberData):
1112
        user = user_or_username
1113
    if isinstance(user_or_username, six.string_types):
1114
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1115
    return user
1116
1117
1118
def get_user_properties(user_or_username):
1119
    """Return User Properties
1120
1121
    :param user_or_username: Plone group identifier
1122
    :returns: Plone MemberData
1123
    """
1124
    user = get_user(user_or_username)
1125
    if user is None:
1126
        return {}
1127
    if not callable(user.getUser):
1128
        return {}
1129
    out = {}
1130
    plone_user = user.getUser()
1131
    for sheet in plone_user.listPropertysheets():
1132
        ps = plone_user.getPropertysheet(sheet)
1133
        out.update(dict(ps.propertyItems()))
1134
    return out
1135
1136
1137
def get_users_by_roles(roles=None):
1138
    """Search Plone users by their roles
1139
1140
    :param roles: Plone role name or list of roles
1141
    :type roles:  list/str
1142
    :returns: List of Plone users having the role(s)
1143
    """
1144
    if not isinstance(roles, (tuple, list)):
1145
        roles = [roles]
1146
    mtool = get_tool("portal_membership")
1147
    return mtool.searchForMembers(roles=roles)
1148
1149
1150
def get_current_user():
1151
    """Returns the current logged in user
1152
1153
    :returns: Current User
1154
    """
1155
    return ploneapi.user.get_current()
1156
1157
1158
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1159
    """Returns the associated contact of a Plone user
1160
1161
    If the user passed in has no contact associated, return None.
1162
    The `contact_types` parameter filter the portal types for the search.
1163
1164
    :param: Plone user
1165
    :contact_types: List with the contact portal types to search
1166
    :returns: Contact associated to the Plone user or None
1167
    """
1168
    if not user:
1169
        return None
1170
1171
    query = {'portal_type': contact_types, 'getUsername': user.id}
1172
    brains = search(query, catalog='portal_catalog')
1173
    if not brains:
1174
        return None
1175
1176
    if len(brains) > 1:
1177
        # Oops, the user has multiple contacts assigned, return None
1178
        contacts = map(lambda c: c.Title, brains)
1179
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1180
        err_msg = err_msg.format(user.id, ','.join(contacts))
1181
        logger.error(err_msg)
1182
        return None
1183
1184
    return get_object(brains[0])
1185
1186
1187
def get_user_client(user_or_contact):
1188
    """Returns the client of the contact of a Plone user
1189
1190
    If the user passed in has no contact or does not belong to any client,
1191
    returns None.
1192
1193
    :param: Plone user or contact
1194
    :returns: Client the contact of the Plone user belongs to
1195
    """
1196
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1197
        # Lab contacts cannot belong to a client
1198
        return None
1199
1200
    if not IContact.providedBy(user_or_contact):
1201
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1202
        if IContact.providedBy(contact):
1203
            return get_user_client(contact)
1204
        return None
1205
1206
    client = get_parent(user_or_contact)
1207
    if client and IClient.providedBy(client):
1208
        return client
1209
1210
    return None
1211
1212
1213
def get_current_client():
1214
    """Returns the current client the current logged in user belongs to, if any
1215
1216
    :returns: Client the current logged in user belongs to or None
1217
    """
1218
    return get_user_client(get_current_user())
1219
1220
1221
def get_cache_key(brain_or_object):
1222
    """Generate a cache key for a common brain or object
1223
1224
    :param brain_or_object: A single catalog brain or content object
1225
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1226
    :returns: Cache Key
1227
    :rtype: str
1228
    """
1229
    key = [
1230
        get_portal_type(brain_or_object),
1231
        get_id(brain_or_object),
1232
        get_uid(brain_or_object),
1233
        # handle different domains gracefully
1234
        get_url(brain_or_object),
1235
        # Return the microsecond since the epoch in GMT
1236
        get_modification_date(brain_or_object).micros(),
1237
    ]
1238
    return "-".join(map(lambda x: str(x), key))
1239
1240
1241
def bika_cache_key_decorator(method, self, brain_or_object):
1242
    """Bika cache key decorator usable for
1243
1244
    :param brain_or_object: A single catalog brain or content object
1245
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1246
    :returns: Cache Key
1247
    :rtype: str
1248
    """
1249
    if brain_or_object is None:
1250
        raise DontCache
1251
    return get_cache_key(brain_or_object)
1252
1253
1254
def normalize_id(string):
1255
    """Normalize the id
1256
1257
    :param string: A string to normalize
1258
    :type string: str
1259
    :returns: Normalized ID
1260
    :rtype: str
1261
    """
1262
    if not isinstance(string, six.string_types):
1263
        fail("Type of argument must be string, found '{}'"
1264
             .format(type(string)))
1265
    # get the id nomalizer utility
1266
    normalizer = getUtility(IIDNormalizer).normalize
1267
    return normalizer(string)
1268
1269
1270
def normalize_filename(string):
1271
    """Normalize the filename
1272
1273
    :param string: A string to normalize
1274
    :type string: str
1275
    :returns: Normalized ID
1276
    :rtype: str
1277
    """
1278
    if not isinstance(string, six.string_types):
1279
        fail("Type of argument must be string, found '{}'"
1280
             .format(type(string)))
1281
    # get the file nomalizer utility
1282
    normalizer = getUtility(IFileNameNormalizer).normalize
1283
    return normalizer(string)
1284
1285
1286
def is_uid(uid, validate=False):
1287
    """Checks if the passed in uid is a valid UID
1288
1289
    :param uid: The uid to check
1290
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1291
    True, also verifies if a brain exists for the uid passed in
1292
    :type uid: string
1293
    :return: True if a valid uid
1294
    :rtype: bool
1295
    """
1296
    if not isinstance(uid, six.string_types):
1297
        return False
1298
    if uid == '0':
1299
        return True
1300
    if len(uid) != 32:
1301
        return False
1302
    if not UID_RX.match(uid):
1303
        return False
1304
    if not validate:
1305
        return True
1306
1307
    # Check if a brain for this uid exists
1308
    uc = get_tool('uid_catalog')
1309
    brains = uc(UID=uid)
1310
    if brains:
1311
        assert (len(brains) == 1)
1312
    return len(brains) > 0
1313
1314
1315
def is_date(date):
1316
    """Checks if the passed in value is a valid Zope's DateTime
1317
1318
    :param date: The date to check
1319
    :type date: DateTime
1320
    :return: True if a valid date
1321
    :rtype: bool
1322
    """
1323
    if not date:
1324
        return False
1325
    return isinstance(date, (DateTime, datetime))
1326
1327
1328
def to_date(value, default=None):
1329
    """Tries to convert the passed in value to Zope's DateTime
1330
1331
    :param value: The value to be converted to a valid DateTime
1332
    :type value: str, DateTime or datetime
1333
    :return: The DateTime representation of the value passed in or default
1334
    """
1335
    if isinstance(value, DateTime):
1336
        return value
1337
    if not value:
1338
        if default is None:
1339
            return None
1340
        return to_date(default)
1341
    try:
1342
        if isinstance(value, str) and '.' in value:
1343
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1344
            return DateTime(value, datefmt='international')
1345
        return DateTime(value)
1346
    except (TypeError, ValueError, DateTimeError):
1347
        return to_date(default)
1348
1349
1350
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1351
               round_to_int=True):
1352
    """Returns the computed total number of minutes
1353
    """
1354
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1355
        float(seconds)/60 + float(milliseconds)/1000/60
1356
    return int(round(total)) if round_to_int else total
1357
1358
1359
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1360
    """Returns a representation of time in a string in xd yh zm format
1361
    """
1362
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1363
                         seconds=seconds, milliseconds=milliseconds)
1364
    delta = timedelta(minutes=int(round(minutes)))
1365
    d = delta.days
1366
    h = delta.seconds // 3600
1367
    m = (delta.seconds // 60) % 60
1368
    m = m and "{}m ".format(str(m)) or ""
1369
    d = d and "{}d ".format(str(d)) or ""
1370
    if m and d:
1371
        h = "{}h ".format(str(h))
1372
    else:
1373
        h = h and "{}h ".format(str(h)) or ""
1374
    return "".join([d, h, m]).strip()
1375
1376
1377
def to_int(value, default=_marker):
1378
    """Tries to convert the value to int.
1379
    Truncates at the decimal point if the value is a float
1380
1381
    :param value: The value to be converted to an int
1382
    :return: The resulting int or default
1383
    """
1384
    if is_floatable(value):
1385
        value = to_float(value)
1386
    try:
1387
        return int(value)
1388
    except (TypeError, ValueError):
1389
        if default is None:
1390
            return default
1391
        if default is not _marker:
1392
            return to_int(default)
1393
        fail("Value %s cannot be converted to int" % repr(value))
1394
1395
1396
def is_floatable(value):
1397
    """Checks if the passed in value is a valid floatable number
1398
1399
    :param value: The value to be evaluated as a float number
1400
    :type value: str, float, int
1401
    :returns: True if is a valid float number
1402
    :rtype: bool"""
1403
    try:
1404
        float(value)
1405
        return True
1406
    except (TypeError, ValueError):
1407
        return False
1408
1409
1410
def to_float(value, default=_marker):
1411
    """Converts the passed in value to a float number
1412
1413
    :param value: The value to be converted to a floatable number
1414
    :type value: str, float, int
1415
    :returns: The float number representation of the passed in value
1416
    :rtype: float
1417
    """
1418
    if not is_floatable(value):
1419
        if default is not _marker:
1420
            return to_float(default)
1421
        fail("Value %s is not floatable" % repr(value))
1422
    return float(value)
1423
1424
1425
def float_to_string(value, default=_marker):
1426
    """Convert a float value to string without exponential notation
1427
1428
    This function preserves the whole fraction
1429
1430
    :param value: The float value to be converted to a string
1431
    :type value: str, float, int
1432
    :returns: String representation of the float w/o exponential notation
1433
    :rtype: str
1434
    """
1435
    if not is_floatable(value):
1436
        if default is not _marker:
1437
            return default
1438
        fail("Value %s is not floatable" % repr(value))
1439
1440
    # Leave floatable string values unchanged
1441
    if isinstance(value, six.string_types):
1442
        return value
1443
1444
    value = float(value)
1445
    str_value = str(value)
1446
1447
    if "." in str_value:
1448
        # might be something like 1.23e-26
1449
        front, back = str_value.split(".")
1450
    else:
1451
        # or 1e-07 for 0.0000001
1452
        back = str_value
1453
1454
    if "e-" in back:
1455
        fraction, zeros = back.split("e-")
1456
        # we want to cover the faction and the zeros
1457
        precision = len(fraction) + int(zeros)
1458
        template = "{:.%df}" % precision
1459
        str_value = template.format(value)
1460
    elif "e+" in back:
1461
        # positive numbers, e.g. 1e+16 don't need a fractional part
1462
        str_value = "{:.0f}".format(value)
1463
1464
    # cut off trailing zeros
1465
    if "." in str_value:
1466
        str_value = str_value.rstrip("0").rstrip(".")
1467
1468
    return str_value
1469
1470
1471
def to_searchable_text_metadata(value):
1472
    """Parse the given metadata value to searchable text
1473
1474
    :param value: The raw value of the metadata column
1475
    :returns: Searchable and translated unicode value or None
1476
    """
1477
    if not value:
1478
        return u""
1479
    if value is Missing.Value:
1480
        return u""
1481
    if is_uid(value):
1482
        return u""
1483
    if isinstance(value, (bool)):
1484
        return u""
1485
    if isinstance(value, (list, tuple)):
1486
        values = map(to_searchable_text_metadata, value)
1487
        values = filter(None, values)
1488
        return " ".join(values)
1489
    if isinstance(value, dict):
1490
        return to_searchable_text_metadata(value.values())
1491
    if is_date(value):
1492
        from senaite.core.api.dtime import date_to_string
1493
        return date_to_string(value, "%Y-%m-%d")
1494
    if is_at_content(value):
1495
        return to_searchable_text_metadata(get_title(value))
1496
    if not isinstance(value, six.string_types):
1497
        value = str(value)
1498
    return safe_unicode(value)
1499
1500
1501
def get_registry_record(name, default=None):
1502
    """Returns the value of a registry record
1503
1504
    :param name: [required] name of the registry record
1505
    :type name: str
1506
    :param default: The value returned if the record is not found
1507
    :type default: anything
1508
    :returns: value of the registry record
1509
    """
1510
    return ploneapi.portal.get_registry_record(name, default=default)
1511
1512
1513
def to_display_list(pairs, sort_by="key", allow_empty=True):
1514
    """Create a Plone DisplayList from list items
1515
1516
    :param pairs: list of key, value pairs
1517
    :param sort_by: Sort the items either by key or value
1518
    :param allow_empty: Allow to select an empty value
1519
    :returns: Plone DisplayList
1520
    """
1521
    dl = DisplayList()
1522
1523
    if isinstance(pairs, six.string_types):
1524
        pairs = [pairs, pairs]
1525
    for pair in pairs:
1526
        # pairs is a list of lists -> add each pair
1527
        if isinstance(pair, (tuple, list)):
1528
            dl.add(*pair)
1529
        # pairs is just a single pair -> add it and stop
1530
        if isinstance(pair, six.string_types):
1531
            dl.add(*pairs)
1532
            break
1533
1534
    # add the empty option
1535
    if allow_empty:
1536
        dl.add("", "")
1537
1538
    # sort by key/value
1539
    if sort_by == "key":
1540
        dl = dl.sortedByKey()
1541
    elif sort_by == "value":
1542
        dl = dl.sortedByValue()
1543
1544
    return dl
1545
1546
1547
def text_to_html(text, wrap="p", encoding="utf8"):
1548
    """Convert `\n` sequences in the text to HTML `\n`
1549
1550
    :param text: Plain text to convert
1551
    :param wrap: Toggle to wrap the text in a
1552
    :returns: HTML converted and encoded text
1553
    """
1554
    if not text:
1555
        return ""
1556
    # handle text internally as unicode
1557
    text = safe_unicode(text)
1558
    # replace newline characters with HTML entities
1559
    html = text.replace("\n", "<br/>")
1560
    if wrap:
1561
        html = u"<{tag}>{html}</{tag}>".format(
1562
            tag=wrap, html=html)
1563
    # return encoded html
1564
    return html.encode(encoding)
1565
1566
1567
def to_utf8(string, default=_marker):
1568
    """Encode string to UTF8
1569
1570
    :param string: String to be encoded to UTF8
1571
    :returns: UTF8 encoded string
1572
    """
1573
    if not isinstance(string, six.string_types):
1574
        if default is _marker:
1575
            fail("Expected string type, got '%s'" % type(string))
1576
        return default
1577
    return safe_unicode(string).encode("utf8")
1578
1579
1580
def is_temporary(obj):
1581
    """Returns whether the given object is temporary or not
1582
1583
    :param obj: the object to evaluate
1584
    :returns: True if the object is temporary
1585
    """
1586
    if UID_RX.match(obj.id):
1587
        return True
1588
1589
    parent = aq_parent(aq_inner(obj))
1590
    if not parent:
1591
        return True
1592
1593
    if UID_RX.match(parent.id):
1594
        return True
1595
1596
    if is_at_content(obj):
1597
        # Checks to see if we are created inside the portal_factory. We don't
1598
        # rely here on AT's isFactoryContained because the function is patched
1599
        meta_type = getattr(aq_base(parent), "meta_type", "")
1600
        return meta_type == "TempFolder"
1601
1602
    return False
1603