Passed
Push — 2.x ( 13498f...877952 )
by Ramon
08:06 queued 03:15
created

bika.lims.api.get_schema()   A

Complexity

Conditions 4

Size

Total Lines 17
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 17
rs 9.85
c 0
b 0
f 0
cc 4
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import re
22
from collections import OrderedDict
23
from datetime import datetime
24
from datetime import timedelta
25
from itertools import groupby
26
27
import six
28
29
import Missing
30
from AccessControl.PermissionRole import rolesForPermissionOn
31
from Acquisition import aq_base
32
from bika.lims import logger
33
from bika.lims.interfaces import IClient
34
from bika.lims.interfaces import IContact
35
from bika.lims.interfaces import ILabContact
36
from DateTime import DateTime
37
from DateTime.interfaces import DateTimeError
38
from plone import api as ploneapi
39
from plone.api.exc import InvalidParameterError
40
from plone.app.layout.viewlets.content import ContentHistoryView
41
from plone.behavior.interfaces import IBehaviorAssignable
42
from plone.dexterity.interfaces import IDexterityContent
43
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
44
from plone.i18n.normalizer.interfaces import IIDNormalizer
45
from plone.memoize.volatile import DontCache
46
from Products.Archetypes.atapi import DisplayList
47
from Products.Archetypes.BaseObject import BaseObject
48
from Products.CMFCore.interfaces import IFolderish
49
from Products.CMFCore.interfaces import ISiteRoot
50
from Products.CMFCore.utils import getToolByName
51
from Products.CMFCore.WorkflowCore import WorkflowException
52
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
53
from Products.CMFPlone.utils import _createObjectByType
54
from Products.CMFPlone.utils import base_hasattr
55
from Products.CMFPlone.utils import safe_unicode
56
from Products.PlonePAS.tools.memberdata import MemberData
57
from Products.ZCatalog.interfaces import ICatalogBrain
58
from zope import globalrequest
59
from zope.component import getUtility
60
from zope.component import queryMultiAdapter
61
from zope.component.interfaces import IFactory
62
from zope.event import notify
63
from zope.lifecycleevent import ObjectCreatedEvent
64
from zope.schema import getFieldsInOrder
65
from zope.security.interfaces import Unauthorized
66
67
"""SENAITE LIMS Framework API
68
69
Please see bika.lims/docs/API.rst for documentation.
70
71
Architecural Notes:
72
73
Please add only functions that do a single thing for a single object.
74
75
Good: `def get_foo(brain_or_object)`
76
Bad:  `def get_foos(list_of_brain_objects)`
77
78
Why?
79
80
Because it makes things more complex. You can always use a pattern like this to
81
achieve the same::
82
83
    >>> foos = map(get_foo, list_of_brain_objects)
84
85
Please add for all of your functions a descriptive test in docs/API.rst.
86
87
Thanks.
88
"""
89
90
_marker = object()
91
92
UID_RX = re.compile("[a-z0-9]{32}$")
93
94
95
class APIError(Exception):
96
    """Base exception class for bika.lims errors."""
97
98
99
def get_portal():
100
    """Get the portal object
101
102
    :returns: Portal object
103
    """
104
    return ploneapi.portal.getSite()
105
106
107
def get_setup():
108
    """Fetch the `bika_setup` folder.
109
    """
110
    portal = get_portal()
111
    return portal.get("bika_setup")
112
113
114
def get_bika_setup():
115
    """Fetch the `bika_setup` folder.
116
    """
117
    return get_setup()
118
119
120
def create(container, portal_type, *args, **kwargs):
121
    """Creates an object in Bika LIMS
122
123
    This code uses most of the parts from the TypesTool
124
    see: `Products.CMFCore.TypesTool._constructInstance`
125
126
    :param container: container
127
    :type container: ATContentType/DexterityContentType/CatalogBrain
128
    :param portal_type: The portal type to create, e.g. "Client"
129
    :type portal_type: string
130
    :param title: The title for the new content object
131
    :type title: string
132
    :returns: The new created object
133
    """
134
    from bika.lims.utils import tmpID
135
136
    id = kwargs.pop("id", tmpID())
137
    title = kwargs.pop("title", "New {}".format(portal_type))
138
139
    # get the fti
140
    types_tool = get_tool("portal_types")
141
    fti = types_tool.getTypeInfo(portal_type)
142
143
    if fti.product:
144
        obj = _createObjectByType(portal_type, container, id)
145
        obj.processForm()
146
        obj.edit(title=title, **kwargs)
147
    else:
148
        # newstyle factory
149
        factory = getUtility(IFactory, fti.factory)
150
        obj = factory(id, *args, **kwargs)
151
        if hasattr(obj, '_setPortalTypeName'):
152
            obj._setPortalTypeName(fti.getId())
153
        # set the title
154
        obj.title = safe_unicode(title)
155
        # notify that the object was created
156
        notify(ObjectCreatedEvent(obj))
157
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and
158
        # ContainerModifiedEvent
159
        container._setObject(id, obj)
160
        # we get the object here with the current object id, as it might be
161
        # renamed already by an event handler
162
        obj = container._getOb(obj.getId())
163
164
    return obj
165
166
167
def get_tool(name, context=None, default=_marker):
168
    """Get a portal tool by name
169
170
    :param name: The name of the tool, e.g. `portal_catalog`
171
    :type name: string
172
    :param context: A portal object
173
    :type context: ATContentType/DexterityContentType/CatalogBrain
174
    :returns: Portal Tool
175
    """
176
177
    # Try first with the context
178
    if context is not None:
179
        try:
180
            context = get_object(context)
181
            return getToolByName(context, name)
182
        except (APIError, AttributeError) as e:
183
            # https://github.com/senaite/bika.lims/issues/396
184
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
185
                        "-> falling back to plone.api.portal.get_tool('{}')"
186
                        .format(repr(context), name, repr(e), name))
187
            return get_tool(name, default=default)
188
189
    # Try with the plone api
190
    try:
191
        return ploneapi.portal.get_tool(name)
192
    except InvalidParameterError:
193
        if default is not _marker:
194
            if isinstance(default, six.string_types):
195
                return get_tool(default)
196
            return default
197
        fail("No tool named '%s' found." % name)
198
199
200
def fail(msg=None):
201
    """API LIMS Error
202
    """
203
    if msg is None:
204
        msg = "Reason not given."
205
    raise APIError("{}".format(msg))
206
207
208
def is_object(brain_or_object):
209
    """Check if the passed in object is a supported portal content object
210
211
    :param brain_or_object: A single catalog brain or content object
212
    :type brain_or_object: Portal Object
213
    :returns: True if the passed in object is a valid portal content
214
    """
215
    if is_portal(brain_or_object):
216
        return True
217
    if is_supermodel(brain_or_object):
218
        return True
219
    if is_at_content(brain_or_object):
220
        return True
221
    if is_dexterity_content(brain_or_object):
222
        return True
223
    if is_brain(brain_or_object):
224
        return True
225
    return False
226
227
228
def get_object(brain_object_uid, default=_marker):
229
    """Get the full content object
230
231
    :param brain_object_uid: A catalog brain or content object or uid
232
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
233
    /CatalogBrain/basestring
234
    :returns: The full object
235
    """
236
    if is_uid(brain_object_uid):
237
        return get_object_by_uid(brain_object_uid)
238
    elif is_supermodel(brain_object_uid):
239
        return brain_object_uid.instance
240
    if not is_object(brain_object_uid):
241
        if default is _marker:
242
            fail("{} is not supported.".format(repr(brain_object_uid)))
243
        return default
244
    if is_brain(brain_object_uid):
245
        return brain_object_uid.getObject()
246
    return brain_object_uid
247
248
249
def is_portal(brain_or_object):
250
    """Checks if the passed in object is the portal root object
251
252
    :param brain_or_object: A single catalog brain or content object
253
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
254
    :returns: True if the object is the portal root object
255
    :rtype: bool
256
    """
257
    return ISiteRoot.providedBy(brain_or_object)
258
259
260
def is_brain(brain_or_object):
261
    """Checks if the passed in object is a portal catalog brain
262
263
    :param brain_or_object: A single catalog brain or content object
264
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
265
    :returns: True if the object is a catalog brain
266
    :rtype: bool
267
    """
268
    return ICatalogBrain.providedBy(brain_or_object)
269
270
271
def is_supermodel(brain_or_object):
272
    """Checks if the passed in object is a supermodel
273
274
    :param brain_or_object: A single catalog brain or content object
275
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
276
    :returns: True if the object is a catalog brain
277
    :rtype: bool
278
    """
279
    # avoid circular imports
280
    from senaite.app.supermodel.interfaces import ISuperModel
281
    return ISuperModel.providedBy(brain_or_object)
282
283
284
def is_dexterity_content(brain_or_object):
285
    """Checks if the passed in object is a dexterity content type
286
287
    :param brain_or_object: A single catalog brain or content object
288
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
289
    :returns: True if the object is a dexterity content type
290
    :rtype: bool
291
    """
292
    return IDexterityContent.providedBy(brain_or_object)
293
294
295
def is_at_content(brain_or_object):
296
    """Checks if the passed in object is an AT content type
297
298
    :param brain_or_object: A single catalog brain or content object
299
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
300
    :returns: True if the object is an AT content type
301
    :rtype: bool
302
    """
303
    return isinstance(brain_or_object, BaseObject)
304
305
306
def is_folderish(brain_or_object):
307
    """Checks if the passed in object is folderish
308
309
    :param brain_or_object: A single catalog brain or content object
310
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
311
    :returns: True if the object is folderish
312
    :rtype: bool
313
    """
314
    if hasattr(brain_or_object, "is_folderish"):
315
        if callable(brain_or_object.is_folderish):
316
            return brain_or_object.is_folderish()
317
        return brain_or_object.is_folderish
318
    return IFolderish.providedBy(get_object(brain_or_object))
319
320
321
def get_portal_type(brain_or_object):
322
    """Get the portal type for this object
323
324
    :param brain_or_object: A single catalog brain or content object
325
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
326
    :returns: Portal type
327
    :rtype: string
328
    """
329
    if not is_object(brain_or_object):
330
        fail("{} is not supported.".format(repr(brain_or_object)))
331
    return brain_or_object.portal_type
332
333
334
def get_schema(brain_or_object):
335
    """Get the schema of the content
336
337
    :param brain_or_object: A single catalog brain or content object
338
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
339
    :returns: Schema object
340
    """
341
    obj = get_object(brain_or_object)
342
    if is_portal(obj):
343
        fail("get_schema can't return schema of portal root")
344
    if is_dexterity_content(obj):
345
        pt = get_tool("portal_types")
346
        fti = pt.getTypeInfo(obj.portal_type)
347
        return fti.lookupSchema()
348
    if is_at_content(obj):
349
        return obj.Schema()
350
    fail("{} has no Schema.".format(brain_or_object))
351
352
353
def get_fields(brain_or_object):
354
    """Get a name to field mapping of the object
355
356
    :param brain_or_object: A single catalog brain or content object
357
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
358
    :returns: Mapping of name -> field
359
    :rtype: OrderedDict
360
    """
361
    obj = get_object(brain_or_object)
362
    schema = get_schema(obj)
363
    if is_dexterity_content(obj):
364
        # get the fields directly provided by the interface
365
        fields = getFieldsInOrder(schema)
366
        # append the fields coming from behaviors
367
        behavior_assignable = IBehaviorAssignable(obj)
368
        if behavior_assignable:
369
            behaviors = behavior_assignable.enumerateBehaviors()
370
            for behavior in behaviors:
371
                fields.extend(getFieldsInOrder(behavior.interface))
372
        return OrderedDict(fields)
373
    return OrderedDict(schema)
374
375
376
def get_id(brain_or_object):
377
    """Get the Plone ID for this object
378
379
    :param brain_or_object: A single catalog brain or content object
380
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
381
    :returns: Plone ID
382
    :rtype: string
383
    """
384
    if is_brain(brain_or_object):
385
        if base_hasattr(brain_or_object, "getId"):
386
            return brain_or_object.getId
387
        if base_hasattr(brain_or_object, "id"):
388
            return brain_or_object.id
389
    return get_object(brain_or_object).getId()
390
391
392
def get_title(brain_or_object):
393
    """Get the Title for this object
394
395
    :param brain_or_object: A single catalog brain or content object
396
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
397
    :returns: Title
398
    :rtype: string
399
    """
400
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
401
        return brain_or_object.Title
402
    return get_object(brain_or_object).Title()
403
404
405
def get_description(brain_or_object):
406
    """Get the Title for this object
407
408
    :param brain_or_object: A single catalog brain or content object
409
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
410
    :returns: Title
411
    :rtype: string
412
    """
413
    if is_brain(brain_or_object) \
414
            and base_hasattr(brain_or_object, "Description"):
415
        return brain_or_object.Description
416
    return get_object(brain_or_object).Description()
417
418
419
def get_uid(brain_or_object):
420
    """Get the Plone UID for this object
421
422
    :param brain_or_object: A single catalog brain or content object or an UID
423
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
424
    :returns: Plone UID
425
    :rtype: string
426
    """
427
    if is_uid(brain_or_object):
428
        return brain_or_object
429
    if is_portal(brain_or_object):
430
        return '0'
431
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
432
        return brain_or_object.UID
433
    return get_object(brain_or_object).UID()
434
435
436
def get_url(brain_or_object):
437
    """Get the absolute URL for this object
438
439
    :param brain_or_object: A single catalog brain or content object
440
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
441
    :returns: Absolute URL
442
    :rtype: string
443
    """
444
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
445
        return brain_or_object.getURL()
446
    return get_object(brain_or_object).absolute_url()
447
448
449
def get_icon(brain_or_object, html_tag=True):
450
    """Get the icon of the content object
451
452
    :param brain_or_object: A single catalog brain or content object
453
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
454
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
455
    :type html_tag: bool
456
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
457
    :rtype: string
458
    """
459
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
460
    # work for Contents coming from other catalogs than the
461
    # `portal_catalog`
462
    portal_types = get_tool("portal_types")
463
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
464
    icon = fti.getIcon()
465
    if not icon:
466
        return ""
467
    url = "%s/%s" % (get_url(get_portal()), icon)
468
    if not html_tag:
469
        return url
470
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
471
        url=url, title=get_title(brain_or_object))
472
    return tag
473
474
475
def get_object_by_uid(uid, default=_marker):
476
    """Find an object by a given UID
477
478
    :param uid: The UID of the object to find
479
    :type uid: string
480
    :returns: Found Object or None
481
    """
482
483
    # nothing to do here
484
    if not uid:
485
        if default is not _marker:
486
            return default
487
        fail("get_object_by_uid requires UID as first argument; got {} instead"
488
             .format(uid))
489
490
    # we defined the portal object UID to be '0'::
491
    if uid == '0':
492
        return get_portal()
493
494
    brain = get_brain_by_uid(uid)
495
496
    if brain is None:
497
        if default is not _marker:
498
            return default
499
        fail("No object found for UID {}".format(uid))
500
501
    return get_object(brain)
502
503
504
def get_brain_by_uid(uid, default=None):
505
    """Query a brain by a given UID
506
507
    :param uid: The UID of the object to find
508
    :type uid: string
509
    :returns: ZCatalog brain or None
510
    """
511
    if not is_uid(uid):
512
        return default
513
514
    # we try to find the object with the UID catalog
515
    uc = get_tool("uid_catalog")
516
517
    # try to find the object with the reference catalog first
518
    brains = uc(UID=uid)
519
    if len(brains) != 1:
520
        return default
521
    return brains[0]
522
523
524
def get_object_by_path(path, default=_marker):
525
    """Find an object by a given physical path or absolute_url
526
527
    :param path: The physical path of the object to find
528
    :type path: string
529
    :returns: Found Object or None
530
    """
531
532
    # nothing to do here
533
    if not path:
534
        if default is not _marker:
535
            return default
536
        fail("get_object_by_path first argument must be a path; {} received"
537
             .format(path))
538
539
    portal = get_portal()
540
    portal_path = get_path(portal)
541
    portal_url = get_url(portal)
542
543
    # ensure we have a physical path
544
    if path.startswith(portal_url):
545
        request = get_request()
546
        path = "/".join(request.physicalPathFromURL(path))
547
548
    if not path.startswith(portal_path):
549
        if default is not _marker:
550
            return default
551
        fail("Not a physical path inside the portal.")
552
553
    if path == portal_path:
554
        return portal
555
556
    return portal.restrictedTraverse(path, default)
557
558
559
def get_path(brain_or_object):
560
    """Calculate the physical path of this object
561
562
    :param brain_or_object: A single catalog brain or content object
563
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
564
    :returns: Physical path of the object
565
    :rtype: string
566
    """
567
    if is_brain(brain_or_object):
568
        return brain_or_object.getPath()
569
    return "/".join(get_object(brain_or_object).getPhysicalPath())
570
571
572
def get_parent_path(brain_or_object):
573
    """Calculate the physical parent path of this object
574
575
    :param brain_or_object: A single catalog brain or content object
576
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
577
    :returns: Physical path of the parent object
578
    :rtype: string
579
    """
580
    if is_portal(brain_or_object):
581
        return get_path(get_portal())
582
    if is_brain(brain_or_object):
583
        path = get_path(brain_or_object)
584
        return path.rpartition("/")[0]
585
    return get_path(get_object(brain_or_object).aq_parent)
586
587
588
def get_parent(brain_or_object, catalog_search=False):
589
    """Locate the parent object of the content/catalog brain
590
591
    The `catalog_search` switch uses the `portal_catalog` to do a search return
592
    a brain instead of the full parent object. However, if the search returned
593
    no results, it falls back to return the full parent object.
594
595
    :param brain_or_object: A single catalog brain or content object
596
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
597
    :param catalog_search: Use a catalog query to find the parent object
598
    :type catalog_search: bool
599
    :returns: parent object
600
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
601
    """
602
603
    if is_portal(brain_or_object):
604
        return get_portal()
605
606
    # Do a catalog search and return the brain
607
    if catalog_search:
608
        parent_path = get_parent_path(brain_or_object)
609
610
        # parent is the portal object
611
        if parent_path == get_path(get_portal()):
612
            return get_portal()
613
614
        # get the catalog tool
615
        pc = get_portal_catalog()
616
617
        # query for the parent path
618
        results = pc(path={
619
            "query": parent_path,
620
            "depth": 0})
621
622
        # No results fallback: return the parent object
623
        if not results:
624
            return get_object(brain_or_object).aq_parent
625
626
        # return the brain
627
        return results[0]
628
629
    return get_object(brain_or_object).aq_parent
630
631
632
def search(query, catalog=_marker):
633
    """Search for objects.
634
635
    :param query: A suitable search query.
636
    :type query: dict
637
    :param catalog: A single catalog id or a list of catalog ids
638
    :type catalog: str/list
639
    :returns: Search results
640
    :rtype: List of ZCatalog brains
641
    """
642
643
    # query needs to be a dictionary
644
    if not isinstance(query, dict):
645
        fail("Catalog query needs to be a dictionary")
646
647
    # Portal types to query
648
    portal_types = query.get("portal_type", [])
649
    # We want the portal_type as a list
650
    if not isinstance(portal_types, (tuple, list)):
651
        portal_types = [portal_types]
652
653
    # The catalogs used for the query
654
    catalogs = []
655
656
    # The user did **not** specify a catalog
657
    if catalog is _marker:
658
        # Find the registered catalogs for the queried portal types
659
        for portal_type in portal_types:
660
            # Just get the first registered/default catalog
661
            catalogs.append(get_catalogs_for(
662
                portal_type, default="portal_catalog")[0])
663
    else:
664
        # User defined catalogs
665
        if isinstance(catalog, (list, tuple)):
666
            catalogs.extend(map(get_tool, catalog))
667
        else:
668
            catalogs.append(get_tool(catalog))
669
670
    # Cleanup: Avoid duplicate catalogs
671
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
672
673
    # We only support **single** catalog queries
674
    if len(catalogs) > 1:
675
        fail("Multi Catalog Queries are not supported!")
676
677
    return catalogs[0](query)
678
679
680
def safe_getattr(brain_or_object, attr, default=_marker):
681
    """Return the attribute value
682
683
    :param brain_or_object: A single catalog brain or content object
684
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
685
    :param attr: Attribute name
686
    :type attr: str
687
    :returns: Attribute value
688
    :rtype: obj
689
    """
690
    try:
691
        value = getattr(brain_or_object, attr, _marker)
692
        if value is _marker:
693
            if default is not _marker:
694
                return default
695
            fail("Attribute '{}' not found.".format(attr))
696
        if callable(value):
697
            return value()
698
        return value
699
    except Unauthorized:
700
        if default is not _marker:
701
            return default
702
        fail("You are not authorized to access '{}' of '{}'.".format(
703
            attr, repr(brain_or_object)))
704
705
706
def get_portal_catalog():
707
    """Get the portal catalog tool
708
709
    :returns: Portal Catalog Tool
710
    """
711
    return get_tool("portal_catalog")
712
713
714
def get_review_history(brain_or_object, rev=True):
715
    """Get the review history for the given brain or context.
716
717
    :param brain_or_object: A single catalog brain or content object
718
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
719
    :returns: Workflow history
720
    :rtype: [{}, ...]
721
    """
722
    obj = get_object(brain_or_object)
723
    review_history = []
724
    try:
725
        workflow = get_tool("portal_workflow")
726
        review_history = workflow.getInfoFor(obj, 'review_history')
727
    except WorkflowException as e:
728
        message = str(e)
729
        logger.error("Cannot retrieve review_history on {}: {}".format(
730
            obj, message))
731
    if not isinstance(review_history, (list, tuple)):
732
        logger.error("get_review_history: expected list, received {}".format(
733
            review_history))
734
        review_history = []
735
736
    if isinstance(review_history, tuple):
737
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
738
        # tuple when "review_history" is passed in:
739
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
740
        #
741
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
742
        # Expression.StateChangeInfo, that always returns a list, except when no
743
        # review_history is found:
744
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
745
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
746
        review_history = list(review_history)
747
748
    if rev is True:
749
        review_history.reverse()
750
    return review_history
751
752
753
def get_revision_history(brain_or_object):
754
    """Get the revision history for the given brain or context.
755
756
    :param brain_or_object: A single catalog brain or content object
757
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
758
    :returns: Workflow history
759
    :rtype: obj
760
    """
761
    obj = get_object(brain_or_object)
762
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
763
    return chv.fullHistory()
764
765
766
def get_workflows_for(brain_or_object):
767
    """Get the assigned workflows for the given brain or context.
768
769
    Note: This function supports also the portal_type as parameter.
770
771
    :param brain_or_object: A single catalog brain or content object
772
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
773
    :returns: Assigned Workflows
774
    :rtype: tuple
775
    """
776
    workflow = ploneapi.portal.get_tool("portal_workflow")
777
    if isinstance(brain_or_object, six.string_types):
778
        return workflow.getChainFor(brain_or_object)
779
    obj = get_object(brain_or_object)
780
    return workflow.getChainFor(obj)
781
782
783
def get_workflow_status_of(brain_or_object, state_var="review_state"):
784
    """Get the current workflow status of the given brain or context.
785
786
    :param brain_or_object: A single catalog brain or content object
787
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
788
    :param state_var: The name of the state variable
789
    :type state_var: string
790
    :returns: Status
791
    :rtype: str
792
    """
793
    # Try to get the state from the catalog brain first
794
    if is_brain(brain_or_object):
795
        if state_var in brain_or_object.schema():
796
            return brain_or_object[state_var]
797
798
    # Retrieve the sate from the object
799
    workflow = get_tool("portal_workflow")
800
    obj = get_object(brain_or_object)
801
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
802
803
804
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
805
    """Get the previous workflow status of the object
806
807
    :param brain_or_object: A single catalog brain or content object
808
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
809
    :param skip: Workflow states to skip
810
    :type skip: tuple/list
811
    :returns: status
812
    :rtype: str
813
    """
814
815
    skip = isinstance(skip, (list, tuple)) and skip or []
816
    history = get_review_history(brain_or_object)
817
818
    # Remove consecutive duplicates, some transitions might happen more than
819
    # once consecutively (e.g. publish)
820
    history = map(lambda i: i[0], groupby(history))
821
822
    for num, item in enumerate(history):
823
        # skip the current history entry
824
        if num == 0:
825
            continue
826
        status = item.get("review_state")
827
        if status in skip:
828
            continue
829
        return status
830
    return default
831
832
833
def get_creation_date(brain_or_object):
834
    """Get the creation date of the brain or object
835
836
    :param brain_or_object: A single catalog brain or content object
837
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
838
    :returns: Creation date
839
    :rtype: DateTime
840
    """
841
    created = getattr(brain_or_object, "created", None)
842
    if created is None:
843
        fail("Object {} has no creation date ".format(
844
             repr(brain_or_object)))
845
    if callable(created):
846
        return created()
847
    return created
848
849
850
def get_modification_date(brain_or_object):
851
    """Get the modification date of the brain or object
852
853
    :param brain_or_object: A single catalog brain or content object
854
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
855
    :returns: Modification date
856
    :rtype: DateTime
857
    """
858
    modified = getattr(brain_or_object, "modified", None)
859
    if modified is None:
860
        fail("Object {} has no modification date ".format(
861
             repr(brain_or_object)))
862
    if callable(modified):
863
        return modified()
864
    return modified
865
866
867
def get_review_status(brain_or_object):
868
    """Get the `review_state` of an object
869
870
    :param brain_or_object: A single catalog brain or content object
871
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
872
    :returns: Value of the review_status variable
873
    :rtype: String
874
    """
875
    if is_brain(brain_or_object):
876
        return brain_or_object.review_state
877
    return get_workflow_status_of(brain_or_object, state_var="review_state")
878
879
880
def is_active(brain_or_object):
881
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
882
883
    :param brain_or_object: A single catalog brain or content object
884
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
885
    :returns: False if the object is in the state 'inactive' or 'cancelled'
886
    :rtype: bool
887
    """
888
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
889
        return False
890
    return True
891
892
893
def get_catalogs_for(brain_or_object, default="portal_catalog"):
894
    """Get all registered catalogs for the given portal_type, catalog brain or
895
    content object
896
897
    :param brain_or_object: The portal_type, a catalog brain or content object
898
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
899
    :returns: List of supported catalogs
900
    :rtype: list
901
    """
902
    archetype_tool = get_tool("archetype_tool", default=None)
903
    if archetype_tool is None:
904
        # return the default catalog
905
        return [get_tool(default, default="portal_catalog")]
906
907
    catalogs = []
908
909
    # get the registered catalogs for portal_type
910
    if is_object(brain_or_object):
911
        catalogs = archetype_tool.getCatalogsByType(
912
            get_portal_type(brain_or_object))
913
    if isinstance(brain_or_object, six.string_types):
914
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
915
916
    if not catalogs:
917
        return [get_tool(default)]
918
    return catalogs
919
920
921
def get_transitions_for(brain_or_object):
922
    """List available workflow transitions for all workflows
923
924
    :param brain_or_object: A single catalog brain or content object
925
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
926
    :returns: All possible available and allowed transitions
927
    :rtype: list[dict]
928
    """
929
    workflow = get_tool('portal_workflow')
930
    transitions = []
931
    instance = get_object(brain_or_object)
932
    for wfid in get_workflows_for(brain_or_object):
933
        wf = workflow[wfid]
934
        tlist = wf.getTransitionsFor(instance)
935
        transitions.extend([t for t in tlist if t not in transitions])
936
    return transitions
937
938
939
def do_transition_for(brain_or_object, transition):
940
    """Performs a workflow transition for the passed in object.
941
942
    :param brain_or_object: A single catalog brain or content object
943
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
944
    :returns: The object where the transtion was performed
945
    """
946
    if not isinstance(transition, six.string_types):
947
        fail("Transition type needs to be string, got '%s'" % type(transition))
948
    obj = get_object(brain_or_object)
949
    try:
950
        ploneapi.content.transition(obj, transition)
951
    except ploneapi.exc.InvalidParameterError as e:
952
        fail("Failed to perform transition '{}' on {}: {}".format(
953
             transition, obj, str(e)))
954
    return obj
955
956
957
def get_roles_for_permission(permission, brain_or_object):
958
    """Get a list of granted roles for the given permission on the object.
959
960
    :param brain_or_object: A single catalog brain or content object
961
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
962
    :returns: Roles for the given Permission
963
    :rtype: list
964
    """
965
    obj = get_object(brain_or_object)
966
    allowed = set(rolesForPermissionOn(permission, obj))
967
    return sorted(allowed)
968
969
970
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
971
    """Checks if the passed in object is versionable.
972
973
    :param brain_or_object: A single catalog brain or content object
974
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
975
    :returns: True if the object is versionable
976
    :rtype: bool
977
    """
978
    pr = get_tool("portal_repository")
979
    obj = get_object(brain_or_object)
980
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
981
        and pr.isVersionable(obj)
982
983
984
def get_version(brain_or_object):
985
    """Get the version of the current object
986
987
    :param brain_or_object: A single catalog brain or content object
988
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
989
    :returns: The current version of the object, or None if not available
990
    :rtype: int or None
991
    """
992
    obj = get_object(brain_or_object)
993
    if not is_versionable(obj):
994
        return None
995
    return getattr(aq_base(obj), "version_id", 0)
996
997
998
def get_view(name, context=None, request=None, default=None):
999
    """Get the view by name
1000
1001
    :param name: The name of the view
1002
    :type name: str
1003
    :param context: The context to query the view
1004
    :type context: ATContentType/DexterityContentType/CatalogBrain
1005
    :param request: The request to query the view
1006
    :type request: HTTPRequest object
1007
    :returns: HTTP Request
1008
    :rtype: Products.Five.metaclass View object
1009
    """
1010
    context = context or get_portal()
1011
    request = request or get_request() or None
1012
    view = queryMultiAdapter((get_object(context), request), name=name)
1013
    if view is None:
1014
        return default
1015
    return view
1016
1017
1018
def get_request():
1019
    """Get the global request object
1020
1021
    :returns: HTTP Request
1022
    :rtype: HTTPRequest object
1023
    """
1024
    return globalrequest.getRequest()
1025
1026
1027
def get_group(group_or_groupname):
1028
    """Return Plone Group
1029
1030
    :param group_or_groupname: Plone group or the name of the group
1031
    :type groupname:  GroupData/str
1032
    :returns: Plone GroupData
1033
    """
1034
    if not group_or_groupname:
1035
1036
        return None
1037
    if hasattr(group_or_groupname, "_getGroup"):
1038
        return group_or_groupname
1039
    gtool = get_tool("portal_groups")
1040
    return gtool.getGroupById(group_or_groupname)
1041
1042
1043
def get_user(user_or_username):
1044
    """Return Plone User
1045
1046
    :param user_or_username: Plone user or user id
1047
    :returns: Plone MemberData
1048
    """
1049
    user = None
1050
    if isinstance(user_or_username, MemberData):
1051
        user = user_or_username
1052
    if isinstance(user_or_username, six.string_types):
1053
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1054
    return user
1055
1056
1057
def get_user_properties(user_or_username):
1058
    """Return User Properties
1059
1060
    :param user_or_username: Plone group identifier
1061
    :returns: Plone MemberData
1062
    """
1063
    user = get_user(user_or_username)
1064
    if user is None:
1065
        return {}
1066
    if not callable(user.getUser):
1067
        return {}
1068
    out = {}
1069
    plone_user = user.getUser()
1070
    for sheet in plone_user.listPropertysheets():
1071
        ps = plone_user.getPropertysheet(sheet)
1072
        out.update(dict(ps.propertyItems()))
1073
    return out
1074
1075
1076
def get_users_by_roles(roles=None):
1077
    """Search Plone users by their roles
1078
1079
    :param roles: Plone role name or list of roles
1080
    :type roles:  list/str
1081
    :returns: List of Plone users having the role(s)
1082
    """
1083
    if not isinstance(roles, (tuple, list)):
1084
        roles = [roles]
1085
    mtool = get_tool("portal_membership")
1086
    return mtool.searchForMembers(roles=roles)
1087
1088
1089
def get_current_user():
1090
    """Returns the current logged in user
1091
1092
    :returns: Current User
1093
    """
1094
    return ploneapi.user.get_current()
1095
1096
1097
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1098
    """Returns the associated contact of a Plone user
1099
1100
    If the user passed in has no contact associated, return None.
1101
    The `contact_types` parameter filter the portal types for the search.
1102
1103
    :param: Plone user
1104
    :contact_types: List with the contact portal types to search
1105
    :returns: Contact associated to the Plone user or None
1106
    """
1107
    if not user:
1108
        return None
1109
1110
    query = {'portal_type': contact_types, 'getUsername': user.id}
1111
    brains = search(query, catalog='portal_catalog')
1112
    if not brains:
1113
        return None
1114
1115
    if len(brains) > 1:
1116
        # Oops, the user has multiple contacts assigned, return None
1117
        contacts = map(lambda c: c.Title, brains)
1118
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1119
        err_msg = err_msg.format(user.id, ','.join(contacts))
1120
        logger.error(err_msg)
1121
        return None
1122
1123
    return get_object(brains[0])
1124
1125
1126
def get_user_client(user_or_contact):
1127
    """Returns the client of the contact of a Plone user
1128
1129
    If the user passed in has no contact or does not belong to any client,
1130
    returns None.
1131
1132
    :param: Plone user or contact
1133
    :returns: Client the contact of the Plone user belongs to
1134
    """
1135
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1136
        # Lab contacts cannot belong to a client
1137
        return None
1138
1139
    if not IContact.providedBy(user_or_contact):
1140
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1141
        if IContact.providedBy(contact):
1142
            return get_user_client(contact)
1143
        return None
1144
1145
    client = get_parent(user_or_contact)
1146
    if client and IClient.providedBy(client):
1147
        return client
1148
1149
    return None
1150
1151
1152
def get_current_client():
1153
    """Returns the current client the current logged in user belongs to, if any
1154
1155
    :returns: Client the current logged in user belongs to or None
1156
    """
1157
    return get_user_client(get_current_user())
1158
1159
1160
def get_cache_key(brain_or_object):
1161
    """Generate a cache key for a common brain or object
1162
1163
    :param brain_or_object: A single catalog brain or content object
1164
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1165
    :returns: Cache Key
1166
    :rtype: str
1167
    """
1168
    key = [
1169
        get_portal_type(brain_or_object),
1170
        get_id(brain_or_object),
1171
        get_uid(brain_or_object),
1172
        # handle different domains gracefully
1173
        get_url(brain_or_object),
1174
        # Return the microsecond since the epoch in GMT
1175
        get_modification_date(brain_or_object).micros(),
1176
    ]
1177
    return "-".join(map(lambda x: str(x), key))
1178
1179
1180
def bika_cache_key_decorator(method, self, brain_or_object):
1181
    """Bika cache key decorator usable for
1182
1183
    :param brain_or_object: A single catalog brain or content object
1184
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1185
    :returns: Cache Key
1186
    :rtype: str
1187
    """
1188
    if brain_or_object is None:
1189
        raise DontCache
1190
    return get_cache_key(brain_or_object)
1191
1192
1193
def normalize_id(string):
1194
    """Normalize the id
1195
1196
    :param string: A string to normalize
1197
    :type string: str
1198
    :returns: Normalized ID
1199
    :rtype: str
1200
    """
1201
    if not isinstance(string, six.string_types):
1202
        fail("Type of argument must be string, found '{}'"
1203
             .format(type(string)))
1204
    # get the id nomalizer utility
1205
    normalizer = getUtility(IIDNormalizer).normalize
1206
    return normalizer(string)
1207
1208
1209
def normalize_filename(string):
1210
    """Normalize the filename
1211
1212
    :param string: A string to normalize
1213
    :type string: str
1214
    :returns: Normalized ID
1215
    :rtype: str
1216
    """
1217
    if not isinstance(string, six.string_types):
1218
        fail("Type of argument must be string, found '{}'"
1219
             .format(type(string)))
1220
    # get the file nomalizer utility
1221
    normalizer = getUtility(IFileNameNormalizer).normalize
1222
    return normalizer(string)
1223
1224
1225
def is_uid(uid, validate=False):
1226
    """Checks if the passed in uid is a valid UID
1227
1228
    :param uid: The uid to check
1229
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1230
    True, also verifies if a brain exists for the uid passed in
1231
    :type uid: string
1232
    :return: True if a valid uid
1233
    :rtype: bool
1234
    """
1235
    if not isinstance(uid, six.string_types):
1236
        return False
1237
    if uid == '0':
1238
        return True
1239
    if len(uid) != 32:
1240
        return False
1241
    if not UID_RX.match(uid):
1242
        return False
1243
    if not validate:
1244
        return True
1245
1246
    # Check if a brain for this uid exists
1247
    uc = get_tool('uid_catalog')
1248
    brains = uc(UID=uid)
1249
    if brains:
1250
        assert (len(brains) == 1)
1251
    return len(brains) > 0
1252
1253
1254
def is_date(date):
1255
    """Checks if the passed in value is a valid Zope's DateTime
1256
1257
    :param date: The date to check
1258
    :type date: DateTime
1259
    :return: True if a valid date
1260
    :rtype: bool
1261
    """
1262
    if not date:
1263
        return False
1264
    return isinstance(date, (DateTime, datetime))
1265
1266
1267
def to_date(value, default=None):
1268
    """Tries to convert the passed in value to Zope's DateTime
1269
1270
    :param value: The value to be converted to a valid DateTime
1271
    :type value: str, DateTime or datetime
1272
    :return: The DateTime representation of the value passed in or default
1273
    """
1274
    if isinstance(value, DateTime):
1275
        return value
1276
    if not value:
1277
        if default is None:
1278
            return None
1279
        return to_date(default)
1280
    try:
1281
        if isinstance(value, str) and '.' in value:
1282
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1283
            return DateTime(value, datefmt='international')
1284
        return DateTime(value)
1285
    except (TypeError, ValueError, DateTimeError):
1286
        return to_date(default)
1287
1288
1289
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1290
               round_to_int=True):
1291
    """Returns the computed total number of minutes
1292
    """
1293
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1294
        float(seconds)/60 + float(milliseconds)/1000/60
1295
    return int(round(total)) if round_to_int else total
1296
1297
1298
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1299
    """Returns a representation of time in a string in xd yh zm format
1300
    """
1301
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1302
                         seconds=seconds, milliseconds=milliseconds)
1303
    delta = timedelta(minutes=int(round(minutes)))
1304
    d = delta.days
1305
    h = delta.seconds // 3600
1306
    m = (delta.seconds // 60) % 60
1307
    m = m and "{}m ".format(str(m)) or ""
1308
    d = d and "{}d ".format(str(d)) or ""
1309
    if m and d:
1310
        h = "{}h ".format(str(h))
1311
    else:
1312
        h = h and "{}h ".format(str(h)) or ""
1313
    return "".join([d, h, m]).strip()
1314
1315
1316
def to_int(value, default=_marker):
1317
    """Tries to convert the value to int.
1318
    Truncates at the decimal point if the value is a float
1319
1320
    :param value: The value to be converted to an int
1321
    :return: The resulting int or default
1322
    """
1323
    if is_floatable(value):
1324
        value = to_float(value)
1325
    try:
1326
        return int(value)
1327
    except (TypeError, ValueError):
1328
        if default is None:
1329
            return default
1330
        if default is not _marker:
1331
            return to_int(default)
1332
        fail("Value %s cannot be converted to int" % repr(value))
1333
1334
1335
def is_floatable(value):
1336
    """Checks if the passed in value is a valid floatable number
1337
1338
    :param value: The value to be evaluated as a float number
1339
    :type value: str, float, int
1340
    :returns: True if is a valid float number
1341
    :rtype: bool"""
1342
    try:
1343
        float(value)
1344
        return True
1345
    except (TypeError, ValueError):
1346
        return False
1347
1348
1349
def to_float(value, default=_marker):
1350
    """Converts the passed in value to a float number
1351
1352
    :param value: The value to be converted to a floatable number
1353
    :type value: str, float, int
1354
    :returns: The float number representation of the passed in value
1355
    :rtype: float
1356
    """
1357
    if not is_floatable(value):
1358
        if default is not _marker:
1359
            return to_float(default)
1360
        fail("Value %s is not floatable" % repr(value))
1361
    return float(value)
1362
1363
1364
def to_searchable_text_metadata(value):
1365
    """Parse the given metadata value to searchable text
1366
1367
    :param value: The raw value of the metadata column
1368
    :returns: Searchable and translated unicode value or None
1369
    """
1370
    if not value:
1371
        return u""
1372
    if value is Missing.Value:
1373
        return u""
1374
    if is_uid(value):
1375
        return u""
1376
    if isinstance(value, (bool)):
1377
        return u""
1378
    if isinstance(value, (list, tuple)):
1379
        values = map(to_searchable_text_metadata, value)
1380
        values = filter(None, values)
1381
        return " ".join(values)
1382
    if isinstance(value, dict):
1383
        return to_searchable_text_metadata(value.values())
1384
    if is_date(value):
1385
        return value.strftime("%Y-%m-%d")
1386
    if is_at_content(value):
1387
        return to_searchable_text_metadata(get_title(value))
1388
    if not isinstance(value, six.string_types):
1389
        value = str(value)
1390
    return safe_unicode(value)
1391
1392
1393
def get_registry_record(name, default=None):
1394
    """Returns the value of a registry record
1395
1396
    :param name: [required] name of the registry record
1397
    :type name: str
1398
    :param default: The value returned if the record is not found
1399
    :type default: anything
1400
    :returns: value of the registry record
1401
    """
1402
    return ploneapi.portal.get_registry_record(name, default=default)
1403
1404
1405
def to_display_list(pairs, sort_by="key", allow_empty=True):
1406
    """Create a Plone DisplayList from list items
1407
1408
    :param pairs: list of key, value pairs
1409
    :param sort_by: Sort the items either by key or value
1410
    :param allow_empty: Allow to select an empty value
1411
    :returns: Plone DisplayList
1412
    """
1413
    dl = DisplayList()
1414
1415
    if isinstance(pairs, six.string_types):
1416
        pairs = [pairs, pairs]
1417
    for pair in pairs:
1418
        # pairs is a list of lists -> add each pair
1419
        if isinstance(pair, (tuple, list)):
1420
            dl.add(*pair)
1421
        # pairs is just a single pair -> add it and stop
1422
        if isinstance(pair, six.string_types):
1423
            dl.add(*pairs)
1424
            break
1425
1426
    # add the empty option
1427
    if allow_empty:
1428
        dl.add("", "")
1429
1430
    # sort by key/value
1431
    if sort_by == "key":
1432
        dl = dl.sortedByKey()
1433
    elif sort_by == "value":
1434
        dl = dl.sortedByValue()
1435
1436
    return dl
1437
1438
1439
def text_to_html(text, wrap="p", encoding="utf8"):
1440
    """Convert `\n` sequences in the text to HTML `\n`
1441
1442
    :param text: Plain text to convert
1443
    :param wrap: Toggle to wrap the text in a
1444
    :returns: HTML converted and encoded text
1445
    """
1446
    if not text:
1447
        return ""
1448
    # handle text internally as unicode
1449
    text = safe_unicode(text)
1450
    # replace newline characters with HTML entities
1451
    html = text.replace("\n", "<br/>")
1452
    if wrap:
1453
        html = u"<{tag}>{html}</{tag}>".format(
1454
            tag=wrap, html=html)
1455
    # return encoded html
1456
    return html.encode(encoding)
1457