Passed
Pull Request — 2.x (#1821)
by Ramon
09:15
created

bika.lims.api.get_users_by_roles()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import re
22
from collections import OrderedDict
23
from datetime import datetime
24
from datetime import timedelta
25
from itertools import groupby
26
27
import six
28
29
import Missing
30
from AccessControl.PermissionRole import rolesForPermissionOn
31
from Acquisition import aq_base
32
from bika.lims import logger
33
from bika.lims.interfaces import IClient
34
from bika.lims.interfaces import IContact
35
from bika.lims.interfaces import ILabContact
36
from DateTime import DateTime
37
from DateTime.interfaces import DateTimeError
38
from plone import api as ploneapi
39
from plone.api.exc import InvalidParameterError
40
from plone.app.layout.viewlets.content import ContentHistoryView
41
from plone.behavior.interfaces import IBehaviorAssignable
42
from plone.dexterity.interfaces import IDexterityContent
43
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
44
from plone.i18n.normalizer.interfaces import IIDNormalizer
45
from plone.memoize.volatile import DontCache
46
from Products.Archetypes.atapi import DisplayList
47
from Products.Archetypes.BaseObject import BaseObject
48
from Products.CMFCore.interfaces import IFolderish
49
from Products.CMFCore.interfaces import ISiteRoot
50
from Products.CMFCore.utils import getToolByName
51
from Products.CMFCore.WorkflowCore import WorkflowException
52
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
53
from Products.CMFPlone.utils import _createObjectByType
54
from Products.CMFPlone.utils import base_hasattr
55
from Products.CMFPlone.utils import safe_unicode
56
from Products.PlonePAS.tools.memberdata import MemberData
57
from Products.ZCatalog.interfaces import ICatalogBrain
58
from zope import globalrequest
59
from zope.component import getUtility
60
from zope.component import queryMultiAdapter
61
from zope.component.interfaces import IFactory
62
from zope.event import notify
63
from zope.lifecycleevent import ObjectCreatedEvent
64
from zope.schema import getFieldsInOrder
65
from zope.security.interfaces import Unauthorized
66
67
"""SENAITE LIMS Framework API
68
69
Please see bika.lims/docs/API.rst for documentation.
70
71
Architecural Notes:
72
73
Please add only functions that do a single thing for a single object.
74
75
Good: `def get_foo(brain_or_object)`
76
Bad:  `def get_foos(list_of_brain_objects)`
77
78
Why?
79
80
Because it makes things more complex. You can always use a pattern like this to
81
achieve the same::
82
83
    >>> foos = map(get_foo, list_of_brain_objects)
84
85
Please add for all of your functions a descriptive test in docs/API.rst.
86
87
Thanks.
88
"""
89
90
_marker = object()
91
92
UID_RX = re.compile("[a-z0-9]{32}$")
93
94
95
class APIError(Exception):
96
    """Base exception class for bika.lims errors."""
97
98
99
def get_portal():
100
    """Get the portal object
101
102
    :returns: Portal object
103
    """
104
    return ploneapi.portal.getSite()
105
106
107
def get_setup():
108
    """Fetch the `bika_setup` folder.
109
    """
110
    portal = get_portal()
111
    return portal.get("bika_setup")
112
113
114
def get_bika_setup():
115
    """Fetch the `bika_setup` folder.
116
    """
117
    return get_setup()
118
119
120
def create(container, portal_type, *args, **kwargs):
121
    """Creates an object in Bika LIMS
122
123
    This code uses most of the parts from the TypesTool
124
    see: `Products.CMFCore.TypesTool._constructInstance`
125
126
    :param container: container
127
    :type container: ATContentType/DexterityContentType/CatalogBrain
128
    :param portal_type: The portal type to create, e.g. "Client"
129
    :type portal_type: string
130
    :param title: The title for the new content object
131
    :type title: string
132
    :returns: The new created object
133
    """
134
    from bika.lims.utils import tmpID
135
136
    id = kwargs.pop("id", tmpID())
137
    title = kwargs.pop("title", "New {}".format(portal_type))
138
139
    # get the fti
140
    types_tool = get_tool("portal_types")
141
    fti = types_tool.getTypeInfo(portal_type)
142
143
    if fti.product:
144
        obj = _createObjectByType(portal_type, container, id)
145
        obj.processForm()
146
        obj.edit(title=title, **kwargs)
147
    else:
148
        # newstyle factory
149
        factory = getUtility(IFactory, fti.factory)
150
        obj = factory(id, *args, **kwargs)
151
        if hasattr(obj, '_setPortalTypeName'):
152
            obj._setPortalTypeName(fti.getId())
153
        # set the title
154
        obj.title = safe_unicode(title)
155
        # notify that the object was created
156
        notify(ObjectCreatedEvent(obj))
157
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and
158
        # ContainerModifiedEvent
159
        container._setObject(id, obj)
160
        # we get the object here with the current object id, as it might be
161
        # renamed already by an event handler
162
        obj = container._getOb(obj.getId())
163
164
    return obj
165
166
167
def get_tool(name, context=None, default=_marker):
168
    """Get a portal tool by name
169
170
    :param name: The name of the tool, e.g. `portal_catalog`
171
    :type name: string
172
    :param context: A portal object
173
    :type context: ATContentType/DexterityContentType/CatalogBrain
174
    :returns: Portal Tool
175
    """
176
177
    # Try first with the context
178
    if context is not None:
179
        try:
180
            context = get_object(context)
181
            return getToolByName(context, name)
182
        except (APIError, AttributeError) as e:
183
            # https://github.com/senaite/bika.lims/issues/396
184
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
185
                        "-> falling back to plone.api.portal.get_tool('{}')"
186
                        .format(repr(context), name, repr(e), name))
187
            return get_tool(name, default=default)
188
189
    # Try with the plone api
190
    try:
191
        return ploneapi.portal.get_tool(name)
192
    except InvalidParameterError:
193
        if default is not _marker:
194
            return default
195
        fail("No tool named '%s' found." % name)
196
197
198
def fail(msg=None):
199
    """API LIMS Error
200
    """
201
    if msg is None:
202
        msg = "Reason not given."
203
    raise APIError("{}".format(msg))
204
205
206
def is_object(brain_or_object):
207
    """Check if the passed in object is a supported portal content object
208
209
    :param brain_or_object: A single catalog brain or content object
210
    :type brain_or_object: Portal Object
211
    :returns: True if the passed in object is a valid portal content
212
    """
213
    if is_portal(brain_or_object):
214
        return True
215
    if is_supermodel(brain_or_object):
216
        return True
217
    if is_at_content(brain_or_object):
218
        return True
219
    if is_dexterity_content(brain_or_object):
220
        return True
221
    if is_brain(brain_or_object):
222
        return True
223
    return False
224
225
226
def get_object(brain_object_uid, default=_marker):
227
    """Get the full content object
228
229
    :param brain_object_uid: A catalog brain or content object or uid
230
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
231
    /CatalogBrain/basestring
232
    :returns: The full object
233
    """
234
    if is_uid(brain_object_uid):
235
        return get_object_by_uid(brain_object_uid)
236
    elif is_supermodel(brain_object_uid):
237
        return brain_object_uid.instance
238
    if not is_object(brain_object_uid):
239
        if default is _marker:
240
            fail("{} is not supported.".format(repr(brain_object_uid)))
241
        return default
242
    if is_brain(brain_object_uid):
243
        return brain_object_uid.getObject()
244
    return brain_object_uid
245
246
247
def is_portal(brain_or_object):
248
    """Checks if the passed in object is the portal root object
249
250
    :param brain_or_object: A single catalog brain or content object
251
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
252
    :returns: True if the object is the portal root object
253
    :rtype: bool
254
    """
255
    return ISiteRoot.providedBy(brain_or_object)
256
257
258
def is_brain(brain_or_object):
259
    """Checks if the passed in object is a portal catalog brain
260
261
    :param brain_or_object: A single catalog brain or content object
262
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
263
    :returns: True if the object is a catalog brain
264
    :rtype: bool
265
    """
266
    return ICatalogBrain.providedBy(brain_or_object)
267
268
269
def is_supermodel(brain_or_object):
270
    """Checks if the passed in object is a supermodel
271
272
    :param brain_or_object: A single catalog brain or content object
273
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
274
    :returns: True if the object is a catalog brain
275
    :rtype: bool
276
    """
277
    # avoid circular imports
278
    from senaite.app.supermodel.interfaces import ISuperModel
279
    return ISuperModel.providedBy(brain_or_object)
280
281
282
def is_dexterity_content(brain_or_object):
283
    """Checks if the passed in object is a dexterity content type
284
285
    :param brain_or_object: A single catalog brain or content object
286
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
287
    :returns: True if the object is a dexterity content type
288
    :rtype: bool
289
    """
290
    return IDexterityContent.providedBy(brain_or_object)
291
292
293
def is_at_content(brain_or_object):
294
    """Checks if the passed in object is an AT content type
295
296
    :param brain_or_object: A single catalog brain or content object
297
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
298
    :returns: True if the object is an AT content type
299
    :rtype: bool
300
    """
301
    return isinstance(brain_or_object, BaseObject)
302
303
304
def is_folderish(brain_or_object):
305
    """Checks if the passed in object is folderish
306
307
    :param brain_or_object: A single catalog brain or content object
308
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
309
    :returns: True if the object is folderish
310
    :rtype: bool
311
    """
312
    if hasattr(brain_or_object, "is_folderish"):
313
        if callable(brain_or_object.is_folderish):
314
            return brain_or_object.is_folderish()
315
        return brain_or_object.is_folderish
316
    return IFolderish.providedBy(get_object(brain_or_object))
317
318
319
def get_portal_type(brain_or_object):
320
    """Get the portal type for this object
321
322
    :param brain_or_object: A single catalog brain or content object
323
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
324
    :returns: Portal type
325
    :rtype: string
326
    """
327
    if not is_object(brain_or_object):
328
        fail("{} is not supported.".format(repr(brain_or_object)))
329
    return brain_or_object.portal_type
330
331
332
def get_schema(brain_or_object):
333
    """Get the schema of the content
334
335
    :param brain_or_object: A single catalog brain or content object
336
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
337
    :returns: Schema object
338
    """
339
    obj = get_object(brain_or_object)
340
    if is_portal(obj):
341
        fail("get_schema can't return schema of portal root")
342
    if is_dexterity_content(obj):
343
        pt = get_tool("portal_types")
344
        fti = pt.getTypeInfo(obj.portal_type)
345
        return fti.lookupSchema()
346
    if is_at_content(obj):
347
        return obj.Schema()
348
    fail("{} has no Schema.".format(brain_or_object))
349
350
351
def get_fields(brain_or_object):
352
    """Get a name to field mapping of the object
353
354
    :param brain_or_object: A single catalog brain or content object
355
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
356
    :returns: Mapping of name -> field
357
    :rtype: OrderedDict
358
    """
359
    obj = get_object(brain_or_object)
360
    schema = get_schema(obj)
361
    if is_dexterity_content(obj):
362
        # get the fields directly provided by the interface
363
        fields = getFieldsInOrder(schema)
364
        # append the fields coming from behaviors
365
        behavior_assignable = IBehaviorAssignable(obj)
366
        if behavior_assignable:
367
            behaviors = behavior_assignable.enumerateBehaviors()
368
            for behavior in behaviors:
369
                fields.extend(getFieldsInOrder(behavior.interface))
370
        return OrderedDict(fields)
371
    return OrderedDict(schema)
372
373
374
def get_id(brain_or_object):
375
    """Get the Plone ID for this object
376
377
    :param brain_or_object: A single catalog brain or content object
378
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
379
    :returns: Plone ID
380
    :rtype: string
381
    """
382
    if is_brain(brain_or_object):
383
        if base_hasattr(brain_or_object, "getId"):
384
            return brain_or_object.getId
385
        if base_hasattr(brain_or_object, "id"):
386
            return brain_or_object.id
387
    return get_object(brain_or_object).getId()
388
389
390
def get_title(brain_or_object):
391
    """Get the Title for this object
392
393
    :param brain_or_object: A single catalog brain or content object
394
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
395
    :returns: Title
396
    :rtype: string
397
    """
398
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
399
        return brain_or_object.Title
400
    return get_object(brain_or_object).Title()
401
402
403
def get_description(brain_or_object):
404
    """Get the Title for this object
405
406
    :param brain_or_object: A single catalog brain or content object
407
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
408
    :returns: Title
409
    :rtype: string
410
    """
411
    if is_brain(brain_or_object) \
412
            and base_hasattr(brain_or_object, "Description"):
413
        return brain_or_object.Description
414
    return get_object(brain_or_object).Description()
415
416
417
def get_uid(brain_or_object):
418
    """Get the Plone UID for this object
419
420
    :param brain_or_object: A single catalog brain or content object or an UID
421
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
422
    :returns: Plone UID
423
    :rtype: string
424
    """
425
    if is_uid(brain_or_object):
426
        return brain_or_object
427
    if is_portal(brain_or_object):
428
        return '0'
429
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
430
        return brain_or_object.UID
431
    return get_object(brain_or_object).UID()
432
433
434
def get_url(brain_or_object):
435
    """Get the absolute URL for this object
436
437
    :param brain_or_object: A single catalog brain or content object
438
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
439
    :returns: Absolute URL
440
    :rtype: string
441
    """
442
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
443
        return brain_or_object.getURL()
444
    return get_object(brain_or_object).absolute_url()
445
446
447
def get_icon(brain_or_object, html_tag=True):
448
    """Get the icon of the content object
449
450
    :param brain_or_object: A single catalog brain or content object
451
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
452
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
453
    :type html_tag: bool
454
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
455
    :rtype: string
456
    """
457
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
458
    # work for Contents coming from other catalogs than the
459
    # `portal_catalog`
460
    portal_types = get_tool("portal_types")
461
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
462
    icon = fti.getIcon()
463
    if not icon:
464
        return ""
465
    url = "%s/%s" % (get_url(get_portal()), icon)
466
    if not html_tag:
467
        return url
468
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
469
        url=url, title=get_title(brain_or_object))
470
    return tag
471
472
473
def get_object_by_uid(uid, default=_marker):
474
    """Find an object by a given UID
475
476
    :param uid: The UID of the object to find
477
    :type uid: string
478
    :returns: Found Object or None
479
    """
480
481
    # nothing to do here
482
    if not uid:
483
        if default is not _marker:
484
            return default
485
        fail("get_object_by_uid requires UID as first argument; got {} instead"
486
             .format(uid))
487
488
    # we defined the portal object UID to be '0'::
489
    if uid == '0':
490
        return get_portal()
491
492
    brain = get_brain_by_uid(uid)
493
494
    if brain is None:
495
        if default is not _marker:
496
            return default
497
        fail("No object found for UID {}".format(uid))
498
499
    return get_object(brain)
500
501
502
def get_brain_by_uid(uid, default=None):
503
    """Query a brain by a given UID
504
505
    :param uid: The UID of the object to find
506
    :type uid: string
507
    :returns: ZCatalog brain or None
508
    """
509
    if not is_uid(uid):
510
        return default
511
512
    # we try to find the object with the UID catalog
513
    uc = get_tool("uid_catalog")
514
515
    # try to find the object with the reference catalog first
516
    brains = uc(UID=uid)
517
    if len(brains) != 1:
518
        return default
519
    return brains[0]
520
521
522
def get_object_by_path(path, default=_marker):
523
    """Find an object by a given physical path or absolute_url
524
525
    :param path: The physical path of the object to find
526
    :type path: string
527
    :returns: Found Object or None
528
    """
529
530
    # nothing to do here
531
    if not path:
532
        if default is not _marker:
533
            return default
534
        fail("get_object_by_path first argument must be a path; {} received"
535
             .format(path))
536
537
    portal = get_portal()
538
    portal_path = get_path(portal)
539
    portal_url = get_url(portal)
540
541
    # ensure we have a physical path
542
    if path.startswith(portal_url):
543
        request = get_request()
544
        path = "/".join(request.physicalPathFromURL(path))
545
546
    if not path.startswith(portal_path):
547
        if default is not _marker:
548
            return default
549
        fail("Not a physical path inside the portal.")
550
551
    if path == portal_path:
552
        return portal
553
554
    return portal.restrictedTraverse(path, default)
555
556
557
def get_path(brain_or_object):
558
    """Calculate the physical path of this object
559
560
    :param brain_or_object: A single catalog brain or content object
561
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
562
    :returns: Physical path of the object
563
    :rtype: string
564
    """
565
    if is_brain(brain_or_object):
566
        return brain_or_object.getPath()
567
    return "/".join(get_object(brain_or_object).getPhysicalPath())
568
569
570
def get_parent_path(brain_or_object):
571
    """Calculate the physical parent path of this object
572
573
    :param brain_or_object: A single catalog brain or content object
574
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
575
    :returns: Physical path of the parent object
576
    :rtype: string
577
    """
578
    if is_portal(brain_or_object):
579
        return get_path(get_portal())
580
    if is_brain(brain_or_object):
581
        path = get_path(brain_or_object)
582
        return path.rpartition("/")[0]
583
    return get_path(get_object(brain_or_object).aq_parent)
584
585
586
def get_parent(brain_or_object, catalog_search=False):
587
    """Locate the parent object of the content/catalog brain
588
589
    The `catalog_search` switch uses the `portal_catalog` to do a search return
590
    a brain instead of the full parent object. However, if the search returned
591
    no results, it falls back to return the full parent object.
592
593
    :param brain_or_object: A single catalog brain or content object
594
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
595
    :param catalog_search: Use a catalog query to find the parent object
596
    :type catalog_search: bool
597
    :returns: parent object
598
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
599
    """
600
601
    if is_portal(brain_or_object):
602
        return get_portal()
603
604
    # Do a catalog search and return the brain
605
    if catalog_search:
606
        parent_path = get_parent_path(brain_or_object)
607
608
        # parent is the portal object
609
        if parent_path == get_path(get_portal()):
610
            return get_portal()
611
612
        # get the catalog tool
613
        pc = get_portal_catalog()
614
615
        # query for the parent path
616
        results = pc(path={
617
            "query": parent_path,
618
            "depth": 0})
619
620
        # No results fallback: return the parent object
621
        if not results:
622
            return get_object(brain_or_object).aq_parent
623
624
        # return the brain
625
        return results[0]
626
627
    return get_object(brain_or_object).aq_parent
628
629
630
def search(query, catalog=_marker):
631
    """Search for objects.
632
633
    :param query: A suitable search query.
634
    :type query: dict
635
    :param catalog: A single catalog id or a list of catalog ids
636
    :type catalog: str/list
637
    :returns: Search results
638
    :rtype: List of ZCatalog brains
639
    """
640
641
    # query needs to be a dictionary
642
    if not isinstance(query, dict):
643
        fail("Catalog query needs to be a dictionary")
644
645
    # Portal types to query
646
    portal_types = query.get("portal_type", [])
647
    # We want the portal_type as a list
648
    if not isinstance(portal_types, (tuple, list)):
649
        portal_types = [portal_types]
650
651
    # The catalogs used for the query
652
    catalogs = []
653
654
    # The user did **not** specify a catalog
655
    if catalog is _marker:
656
        # Find the registered catalogs for the queried portal types
657
        for portal_type in portal_types:
658
            # Just get the first registered/default catalog
659
            catalogs.append(get_catalogs_for(
660
                portal_type, default="portal_catalog")[0])
661
    else:
662
        # User defined catalogs
663
        if isinstance(catalog, (list, tuple)):
664
            catalogs.extend(map(get_tool, catalog))
665
        else:
666
            catalogs.append(get_tool(catalog))
667
668
    # Cleanup: Avoid duplicate catalogs
669
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
670
671
    # We only support **single** catalog queries
672
    if len(catalogs) > 1:
673
        fail("Multi Catalog Queries are not supported!")
674
675
    return catalogs[0](query)
676
677
678
def safe_getattr(brain_or_object, attr, default=_marker):
679
    """Return the attribute value
680
681
    :param brain_or_object: A single catalog brain or content object
682
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
683
    :param attr: Attribute name
684
    :type attr: str
685
    :returns: Attribute value
686
    :rtype: obj
687
    """
688
    try:
689
        value = getattr(brain_or_object, attr, _marker)
690
        if value is _marker:
691
            if default is not _marker:
692
                return default
693
            fail("Attribute '{}' not found.".format(attr))
694
        if callable(value):
695
            return value()
696
        return value
697
    except Unauthorized:
698
        if default is not _marker:
699
            return default
700
        fail("You are not authorized to access '{}' of '{}'.".format(
701
            attr, repr(brain_or_object)))
702
703
704
def get_portal_catalog():
705
    """Get the portal catalog tool
706
707
    :returns: Portal Catalog Tool
708
    """
709
    return get_tool("portal_catalog")
710
711
712
def get_review_history(brain_or_object, rev=True):
713
    """Get the review history for the given brain or context.
714
715
    :param brain_or_object: A single catalog brain or content object
716
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
717
    :returns: Workflow history
718
    :rtype: [{}, ...]
719
    """
720
    obj = get_object(brain_or_object)
721
    review_history = []
722
    try:
723
        workflow = get_tool("portal_workflow")
724
        review_history = workflow.getInfoFor(obj, 'review_history')
725
    except WorkflowException as e:
726
        message = str(e)
727
        logger.error("Cannot retrieve review_history on {}: {}".format(
728
            obj, message))
729
    if not isinstance(review_history, (list, tuple)):
730
        logger.error("get_review_history: expected list, received {}".format(
731
            review_history))
732
        review_history = []
733
734
    if isinstance(review_history, tuple):
735
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
736
        # tuple when "review_history" is passed in:
737
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
738
        #
739
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
740
        # Expression.StateChangeInfo, that always returns a list, except when no
741
        # review_history is found:
742
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
743
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
744
        review_history = list(review_history)
745
746
    if rev is True:
747
        review_history.reverse()
748
    return review_history
749
750
751
def get_revision_history(brain_or_object):
752
    """Get the revision history for the given brain or context.
753
754
    :param brain_or_object: A single catalog brain or content object
755
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
756
    :returns: Workflow history
757
    :rtype: obj
758
    """
759
    obj = get_object(brain_or_object)
760
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
761
    return chv.fullHistory()
762
763
764
def get_workflows_for(brain_or_object):
765
    """Get the assigned workflows for the given brain or context.
766
767
    Note: This function supports also the portal_type as parameter.
768
769
    :param brain_or_object: A single catalog brain or content object
770
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
771
    :returns: Assigned Workflows
772
    :rtype: tuple
773
    """
774
    workflow = ploneapi.portal.get_tool("portal_workflow")
775
    if isinstance(brain_or_object, six.string_types):
776
        return workflow.getChainFor(brain_or_object)
777
    obj = get_object(brain_or_object)
778
    return workflow.getChainFor(obj)
779
780
781
def get_workflow_status_of(brain_or_object, state_var="review_state"):
782
    """Get the current workflow status of the given brain or context.
783
784
    :param brain_or_object: A single catalog brain or content object
785
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
786
    :param state_var: The name of the state variable
787
    :type state_var: string
788
    :returns: Status
789
    :rtype: str
790
    """
791
    # Try to get the state from the catalog brain first
792
    if is_brain(brain_or_object):
793
        if state_var in brain_or_object.schema():
794
            return brain_or_object[state_var]
795
796
    # Retrieve the sate from the object
797
    workflow = get_tool("portal_workflow")
798
    obj = get_object(brain_or_object)
799
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
800
801
802
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
803
    """Get the previous workflow status of the object
804
805
    :param brain_or_object: A single catalog brain or content object
806
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
807
    :param skip: Workflow states to skip
808
    :type skip: tuple/list
809
    :returns: status
810
    :rtype: str
811
    """
812
813
    skip = isinstance(skip, (list, tuple)) and skip or []
814
    history = get_review_history(brain_or_object)
815
816
    # Remove consecutive duplicates, some transitions might happen more than
817
    # once consecutively (e.g. publish)
818
    history = map(lambda i: i[0], groupby(history))
819
820
    for num, item in enumerate(history):
821
        # skip the current history entry
822
        if num == 0:
823
            continue
824
        status = item.get("review_state")
825
        if status in skip:
826
            continue
827
        return status
828
    return default
829
830
831
def get_creation_date(brain_or_object):
832
    """Get the creation date of the brain or object
833
834
    :param brain_or_object: A single catalog brain or content object
835
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
836
    :returns: Creation date
837
    :rtype: DateTime
838
    """
839
    created = getattr(brain_or_object, "created", None)
840
    if created is None:
841
        fail("Object {} has no creation date ".format(
842
             repr(brain_or_object)))
843
    if callable(created):
844
        return created()
845
    return created
846
847
848
def get_modification_date(brain_or_object):
849
    """Get the modification date of the brain or object
850
851
    :param brain_or_object: A single catalog brain or content object
852
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
853
    :returns: Modification date
854
    :rtype: DateTime
855
    """
856
    modified = getattr(brain_or_object, "modified", None)
857
    if modified is None:
858
        fail("Object {} has no modification date ".format(
859
             repr(brain_or_object)))
860
    if callable(modified):
861
        return modified()
862
    return modified
863
864
865
def get_review_status(brain_or_object):
866
    """Get the `review_state` of an object
867
868
    :param brain_or_object: A single catalog brain or content object
869
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
870
    :returns: Value of the review_status variable
871
    :rtype: String
872
    """
873
    if is_brain(brain_or_object):
874
        return brain_or_object.review_state
875
    return get_workflow_status_of(brain_or_object, state_var="review_state")
876
877
878
def is_active(brain_or_object):
879
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
880
881
    :param brain_or_object: A single catalog brain or content object
882
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
883
    :returns: False if the object is in the state 'inactive' or 'cancelled'
884
    :rtype: bool
885
    """
886
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
887
        return False
888
    return True
889
890
891
def get_catalogs_for(brain_or_object, default="portal_catalog"):
892
    """Get all registered catalogs for the given portal_type, catalog brain or
893
    content object
894
895
    :param brain_or_object: The portal_type, a catalog brain or content object
896
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
897
    :returns: List of supported catalogs
898
    :rtype: list
899
    """
900
    archetype_tool = get_tool("archetype_tool", default=None)
901
    if archetype_tool is None:
902
        # return the default catalog
903
        return [get_tool(default, default="portal_catalog")]
904
905
    catalogs = []
906
907
    # get the registered catalogs for portal_type
908
    if is_object(brain_or_object):
909
        catalogs = archetype_tool.getCatalogsByType(
910
            get_portal_type(brain_or_object))
911
    if isinstance(brain_or_object, six.string_types):
912
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
913
914
    if not catalogs:
915
        return [get_tool(default)]
916
    return catalogs
917
918
919
def get_transitions_for(brain_or_object):
920
    """List available workflow transitions for all workflows
921
922
    :param brain_or_object: A single catalog brain or content object
923
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
924
    :returns: All possible available and allowed transitions
925
    :rtype: list[dict]
926
    """
927
    workflow = get_tool('portal_workflow')
928
    transitions = []
929
    instance = get_object(brain_or_object)
930
    for wfid in get_workflows_for(brain_or_object):
931
        wf = workflow[wfid]
932
        tlist = wf.getTransitionsFor(instance)
933
        transitions.extend([t for t in tlist if t not in transitions])
934
    return transitions
935
936
937
def do_transition_for(brain_or_object, transition):
938
    """Performs a workflow transition for the passed in object.
939
940
    :param brain_or_object: A single catalog brain or content object
941
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
942
    :returns: The object where the transtion was performed
943
    """
944
    if not isinstance(transition, six.string_types):
945
        fail("Transition type needs to be string, got '%s'" % type(transition))
946
    obj = get_object(brain_or_object)
947
    try:
948
        ploneapi.content.transition(obj, transition)
949
    except ploneapi.exc.InvalidParameterError as e:
950
        fail("Failed to perform transition '{}' on {}: {}".format(
951
             transition, obj, str(e)))
952
    return obj
953
954
955
def get_roles_for_permission(permission, brain_or_object):
956
    """Get a list of granted roles for the given permission on the object.
957
958
    :param brain_or_object: A single catalog brain or content object
959
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
960
    :returns: Roles for the given Permission
961
    :rtype: list
962
    """
963
    obj = get_object(brain_or_object)
964
    allowed = set(rolesForPermissionOn(permission, obj))
965
    return sorted(allowed)
966
967
968
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
969
    """Checks if the passed in object is versionable.
970
971
    :param brain_or_object: A single catalog brain or content object
972
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
973
    :returns: True if the object is versionable
974
    :rtype: bool
975
    """
976
    pr = get_tool("portal_repository")
977
    obj = get_object(brain_or_object)
978
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
979
        and pr.isVersionable(obj)
980
981
982
def get_version(brain_or_object):
983
    """Get the version of the current object
984
985
    :param brain_or_object: A single catalog brain or content object
986
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
987
    :returns: The current version of the object, or None if not available
988
    :rtype: int or None
989
    """
990
    obj = get_object(brain_or_object)
991
    if not is_versionable(obj):
992
        return None
993
    return getattr(aq_base(obj), "version_id", 0)
994
995
996
def get_view(name, context=None, request=None, default=None):
997
    """Get the view by name
998
999
    :param name: The name of the view
1000
    :type name: str
1001
    :param context: The context to query the view
1002
    :type context: ATContentType/DexterityContentType/CatalogBrain
1003
    :param request: The request to query the view
1004
    :type request: HTTPRequest object
1005
    :returns: HTTP Request
1006
    :rtype: Products.Five.metaclass View object
1007
    """
1008
    context = context or get_portal()
1009
    request = request or get_request() or None
1010
    view = queryMultiAdapter((get_object(context), request), name=name)
1011
    if view is None:
1012
        return default
1013
    return view
1014
1015
1016
def get_request():
1017
    """Get the global request object
1018
1019
    :returns: HTTP Request
1020
    :rtype: HTTPRequest object
1021
    """
1022
    return globalrequest.getRequest()
1023
1024
1025
def get_group(group_or_groupname):
1026
    """Return Plone Group
1027
1028
    :param group_or_groupname: Plone group or the name of the group
1029
    :type groupname:  GroupData/str
1030
    :returns: Plone GroupData
1031
    """
1032
    if not group_or_groupname:
1033
1034
        return None
1035
    if hasattr(group_or_groupname, "_getGroup"):
1036
        return group_or_groupname
1037
    gtool = get_tool("portal_groups")
1038
    return gtool.getGroupById(group_or_groupname)
1039
1040
1041
def get_user(user_or_username):
1042
    """Return Plone User
1043
1044
    :param user_or_username: Plone user or user id
1045
    :returns: Plone MemberData
1046
    """
1047
    user = None
1048
    if isinstance(user_or_username, MemberData):
1049
        user = user_or_username
1050
    if isinstance(user_or_username, six.string_types):
1051
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1052
    return user
1053
1054
1055
def get_user_properties(user_or_username):
1056
    """Return User Properties
1057
1058
    :param user_or_username: Plone group identifier
1059
    :returns: Plone MemberData
1060
    """
1061
    user = get_user(user_or_username)
1062
    if user is None:
1063
        return {}
1064
    if not callable(user.getUser):
1065
        return {}
1066
    out = {}
1067
    plone_user = user.getUser()
1068
    for sheet in plone_user.listPropertysheets():
1069
        ps = plone_user.getPropertysheet(sheet)
1070
        out.update(dict(ps.propertyItems()))
1071
    return out
1072
1073
1074
def get_users_by_roles(roles=None):
1075
    """Search Plone users by their roles
1076
1077
    :param roles: Plone role name or list of roles
1078
    :type roles:  list/str
1079
    :returns: List of Plone users having the role(s)
1080
    """
1081
    if not isinstance(roles, (tuple, list)):
1082
        roles = [roles]
1083
    mtool = get_tool("portal_membership")
1084
    return mtool.searchForMembers(roles=roles)
1085
1086
1087
def get_current_user():
1088
    """Returns the current logged in user
1089
1090
    :returns: Current User
1091
    """
1092
    return ploneapi.user.get_current()
1093
1094
1095
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1096
    """Returns the associated contact of a Plone user
1097
1098
    If the user passed in has no contact associated, return None.
1099
    The `contact_types` parameter filter the portal types for the search.
1100
1101
    :param: Plone user
1102
    :contact_types: List with the contact portal types to search
1103
    :returns: Contact associated to the Plone user or None
1104
    """
1105
    if not user:
1106
        return None
1107
1108
    query = {'portal_type': contact_types, 'getUsername': user.id}
1109
    brains = search(query, catalog='portal_catalog')
1110
    if not brains:
1111
        return None
1112
1113
    if len(brains) > 1:
1114
        # Oops, the user has multiple contacts assigned, return None
1115
        contacts = map(lambda c: c.Title, brains)
1116
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1117
        err_msg = err_msg.format(user.id, ','.join(contacts))
1118
        logger.error(err_msg)
1119
        return None
1120
1121
    return get_object(brains[0])
1122
1123
1124
def get_user_client(user_or_contact):
1125
    """Returns the client of the contact of a Plone user
1126
1127
    If the user passed in has no contact or does not belong to any client,
1128
    returns None.
1129
1130
    :param: Plone user or contact
1131
    :returns: Client the contact of the Plone user belongs to
1132
    """
1133
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1134
        # Lab contacts cannot belong to a client
1135
        return None
1136
1137
    if not IContact.providedBy(user_or_contact):
1138
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1139
        if IContact.providedBy(contact):
1140
            return get_user_client(contact)
1141
        return None
1142
1143
    client = get_parent(user_or_contact)
1144
    if client and IClient.providedBy(client):
1145
        return client
1146
1147
    return None
1148
1149
1150
def get_current_client():
1151
    """Returns the current client the current logged in user belongs to, if any
1152
1153
    :returns: Client the current logged in user belongs to or None
1154
    """
1155
    return get_user_client(get_current_user())
1156
1157
1158
def get_cache_key(brain_or_object):
1159
    """Generate a cache key for a common brain or object
1160
1161
    :param brain_or_object: A single catalog brain or content object
1162
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1163
    :returns: Cache Key
1164
    :rtype: str
1165
    """
1166
    key = [
1167
        get_portal_type(brain_or_object),
1168
        get_id(brain_or_object),
1169
        get_uid(brain_or_object),
1170
        # handle different domains gracefully
1171
        get_url(brain_or_object),
1172
        # Return the microsecond since the epoch in GMT
1173
        get_modification_date(brain_or_object).micros(),
1174
    ]
1175
    return "-".join(map(lambda x: str(x), key))
1176
1177
1178
def bika_cache_key_decorator(method, self, brain_or_object):
1179
    """Bika cache key decorator usable for
1180
1181
    :param brain_or_object: A single catalog brain or content object
1182
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1183
    :returns: Cache Key
1184
    :rtype: str
1185
    """
1186
    if brain_or_object is None:
1187
        raise DontCache
1188
    return get_cache_key(brain_or_object)
1189
1190
1191
def normalize_id(string):
1192
    """Normalize the id
1193
1194
    :param string: A string to normalize
1195
    :type string: str
1196
    :returns: Normalized ID
1197
    :rtype: str
1198
    """
1199
    if not isinstance(string, six.string_types):
1200
        fail("Type of argument must be string, found '{}'"
1201
             .format(type(string)))
1202
    # get the id nomalizer utility
1203
    normalizer = getUtility(IIDNormalizer).normalize
1204
    return normalizer(string)
1205
1206
1207
def normalize_filename(string):
1208
    """Normalize the filename
1209
1210
    :param string: A string to normalize
1211
    :type string: str
1212
    :returns: Normalized ID
1213
    :rtype: str
1214
    """
1215
    if not isinstance(string, six.string_types):
1216
        fail("Type of argument must be string, found '{}'"
1217
             .format(type(string)))
1218
    # get the file nomalizer utility
1219
    normalizer = getUtility(IFileNameNormalizer).normalize
1220
    return normalizer(string)
1221
1222
1223
def is_uid(uid, validate=False):
1224
    """Checks if the passed in uid is a valid UID
1225
1226
    :param uid: The uid to check
1227
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1228
    True, also verifies if a brain exists for the uid passed in
1229
    :type uid: string
1230
    :return: True if a valid uid
1231
    :rtype: bool
1232
    """
1233
    if not isinstance(uid, six.string_types):
1234
        return False
1235
    if uid == '0':
1236
        return True
1237
    if len(uid) != 32:
1238
        return False
1239
    if not UID_RX.match(uid):
1240
        return False
1241
    if not validate:
1242
        return True
1243
1244
    # Check if a brain for this uid exists
1245
    uc = get_tool('uid_catalog')
1246
    brains = uc(UID=uid)
1247
    if brains:
1248
        assert (len(brains) == 1)
1249
    return len(brains) > 0
1250
1251
1252
def is_date(date):
1253
    """Checks if the passed in value is a valid Zope's DateTime
1254
1255
    :param date: The date to check
1256
    :type date: DateTime
1257
    :return: True if a valid date
1258
    :rtype: bool
1259
    """
1260
    if not date:
1261
        return False
1262
    return isinstance(date, (DateTime, datetime))
1263
1264
1265
def to_date(value, default=None):
1266
    """Tries to convert the passed in value to Zope's DateTime
1267
1268
    :param value: The value to be converted to a valid DateTime
1269
    :type value: str, DateTime or datetime
1270
    :return: The DateTime representation of the value passed in or default
1271
    """
1272
    if isinstance(value, DateTime):
1273
        return value
1274
    if not value:
1275
        if default is None:
1276
            return None
1277
        return to_date(default)
1278
    try:
1279
        if isinstance(value, str) and '.' in value:
1280
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1281
            return DateTime(value, datefmt='international')
1282
        return DateTime(value)
1283
    except (TypeError, ValueError, DateTimeError):
1284
        return to_date(default)
1285
1286
1287
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1288
               round_to_int=True):
1289
    """Returns the computed total number of minutes
1290
    """
1291
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1292
        float(seconds)/60 + float(milliseconds)/1000/60
1293
    return int(round(total)) if round_to_int else total
1294
1295
1296
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1297
    """Returns a representation of time in a string in xd yh zm format
1298
    """
1299
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1300
                         seconds=seconds, milliseconds=milliseconds)
1301
    delta = timedelta(minutes=int(round(minutes)))
1302
    d = delta.days
1303
    h = delta.seconds // 3600
1304
    m = (delta.seconds // 60) % 60
1305
    m = m and "{}m ".format(str(m)) or ""
1306
    d = d and "{}d ".format(str(d)) or ""
1307
    if m and d:
1308
        h = "{}h ".format(str(h))
1309
    else:
1310
        h = h and "{}h ".format(str(h)) or ""
1311
    return "".join([d, h, m]).strip()
1312
1313
1314
def to_int(value, default=_marker):
1315
    """Tries to convert the value to int.
1316
    Truncates at the decimal point if the value is a float
1317
1318
    :param value: The value to be converted to an int
1319
    :return: The resulting int or default
1320
    """
1321
    if is_floatable(value):
1322
        value = to_float(value)
1323
    try:
1324
        return int(value)
1325
    except (TypeError, ValueError):
1326
        if default is None:
1327
            return default
1328
        if default is not _marker:
1329
            return to_int(default)
1330
        fail("Value %s cannot be converted to int" % repr(value))
1331
1332
1333
def is_floatable(value):
1334
    """Checks if the passed in value is a valid floatable number
1335
1336
    :param value: The value to be evaluated as a float number
1337
    :type value: str, float, int
1338
    :returns: True if is a valid float number
1339
    :rtype: bool"""
1340
    try:
1341
        float(value)
1342
        return True
1343
    except (TypeError, ValueError):
1344
        return False
1345
1346
1347
def to_float(value, default=_marker):
1348
    """Converts the passed in value to a float number
1349
1350
    :param value: The value to be converted to a floatable number
1351
    :type value: str, float, int
1352
    :returns: The float number representation of the passed in value
1353
    :rtype: float
1354
    """
1355
    if not is_floatable(value):
1356
        if default is not _marker:
1357
            return to_float(default)
1358
        fail("Value %s is not floatable" % repr(value))
1359
    return float(value)
1360
1361
1362
def to_searchable_text_metadata(value):
1363
    """Parse the given metadata value to searchable text
1364
1365
    :param value: The raw value of the metadata column
1366
    :returns: Searchable and translated unicode value or None
1367
    """
1368
    if not value:
1369
        return u""
1370
    if value is Missing.Value:
1371
        return u""
1372
    if is_uid(value):
1373
        return u""
1374
    if isinstance(value, (bool)):
1375
        return u""
1376
    if isinstance(value, (list, tuple)):
1377
        values = map(to_searchable_text_metadata, value)
1378
        values = filter(None, values)
1379
        return " ".join(values)
1380
    if isinstance(value, dict):
1381
        return to_searchable_text_metadata(value.values())
1382
    if is_date(value):
1383
        return value.strftime("%Y-%m-%d")
1384
    if is_at_content(value):
1385
        return to_searchable_text_metadata(get_title(value))
1386
    if not isinstance(value, six.string_types):
1387
        value = str(value)
1388
    return safe_unicode(value)
1389
1390
1391
def get_registry_record(name, default=None):
1392
    """Returns the value of a registry record
1393
1394
    :param name: [required] name of the registry record
1395
    :type name: str
1396
    :param default: The value returned if the record is not found
1397
    :type default: anything
1398
    :returns: value of the registry record
1399
    """
1400
    return ploneapi.portal.get_registry_record(name, default=default)
1401
1402
1403
def to_display_list(pairs, sort_by="key", allow_empty=True):
1404
    """Create a Plone DisplayList from list items
1405
1406
    :param pairs: list of key, value pairs
1407
    :param sort_by: Sort the items either by key or value
1408
    :param allow_empty: Allow to select an empty value
1409
    :returns: Plone DisplayList
1410
    """
1411
    dl = DisplayList()
1412
1413
    if isinstance(pairs, six.string_types):
1414
        pairs = [pairs, pairs]
1415
    for pair in pairs:
1416
        # pairs is a list of lists -> add each pair
1417
        if isinstance(pair, (tuple, list)):
1418
            dl.add(*pair)
1419
        # pairs is just a single pair -> add it and stop
1420
        if isinstance(pair, six.string_types):
1421
            dl.add(*pairs)
1422
            break
1423
1424
    # add the empty option
1425
    if allow_empty:
1426
        dl.add("", "")
1427
1428
    # sort by key/value
1429
    if sort_by == "key":
1430
        dl = dl.sortedByKey()
1431
    elif sort_by == "value":
1432
        dl = dl.sortedByValue()
1433
1434
    return dl
1435
1436
1437
def text_to_html(text, wrap="p", encoding="utf8"):
1438
    """Convert `\n` sequences in the text to HTML `\n`
1439
1440
    :param text: Plain text to convert
1441
    :param wrap: Toggle to wrap the text in a
1442
    :returns: HTML converted and encoded text
1443
    """
1444
    if not text:
1445
        return ""
1446
    # handle text internally as unicode
1447
    text = safe_unicode(text)
1448
    # replace newline characters with HTML entities
1449
    html = text.replace("\n", "<br/>")
1450
    if wrap:
1451
        html = u"<{tag}>{html}</{tag}>".format(
1452
            tag=wrap, html=html)
1453
    # return encoded html
1454
    return html.encode(encoding)
1455