Passed
Push — 2.x ( b4b250...e86af7 )
by Ramon
05:31
created

bika.lims.api.is_supermodel()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import Missing
22
import re
23
import six
24
from AccessControl.PermissionRole import rolesForPermissionOn
25
from Acquisition import aq_base
26
from bika.lims import logger
27
from bika.lims.interfaces import IClient
28
from bika.lims.interfaces import IContact
29
from bika.lims.interfaces import ILabContact
30
from collections import OrderedDict
31
from datetime import datetime
32
from DateTime import DateTime
33
from datetime import timedelta
34
from DateTime.interfaces import DateTimeError
35
from itertools import groupby
36
from plone import api as ploneapi
37
from plone.api.exc import InvalidParameterError
38
from plone.app.layout.viewlets.content import ContentHistoryView
39
from plone.behavior.interfaces import IBehaviorAssignable
40
from plone.dexterity.interfaces import IDexterityContent
41
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
42
from plone.i18n.normalizer.interfaces import IIDNormalizer
43
from plone.memoize.volatile import DontCache
44
from Products.Archetypes.atapi import DisplayList
45
from Products.Archetypes.BaseObject import BaseObject
46
from Products.CMFCore.interfaces import IFolderish
47
from Products.CMFCore.interfaces import ISiteRoot
48
from Products.CMFCore.utils import getToolByName
49
from Products.CMFCore.WorkflowCore import WorkflowException
50
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
51
from Products.CMFPlone.utils import _createObjectByType
52
from Products.CMFPlone.utils import base_hasattr
53
from Products.CMFPlone.utils import safe_unicode
54
from Products.PlonePAS.tools.memberdata import MemberData
55
from Products.ZCatalog.interfaces import ICatalogBrain
56
from zope import globalrequest
57
from zope.annotation.interfaces import IAttributeAnnotatable
58
from zope.component import getUtility
59
from zope.component import queryMultiAdapter
60
from zope.component.interfaces import IFactory
61
from zope.event import notify
62
from zope.interface import directlyProvides
63
from zope.lifecycleevent import ObjectCreatedEvent
64
from zope.publisher.browser import TestRequest
65
from zope.schema import getFieldsInOrder
66
from zope.security.interfaces import Unauthorized
67
68
"""SENAITE LIMS Framework API
69
70
Please see bika.lims/docs/API.rst for documentation.
71
72
Architecural Notes:
73
74
Please add only functions that do a single thing for a single object.
75
76
Good: `def get_foo(brain_or_object)`
77
Bad:  `def get_foos(list_of_brain_objects)`
78
79
Why?
80
81
Because it makes things more complex. You can always use a pattern like this to
82
achieve the same::
83
84
    >>> foos = map(get_foo, list_of_brain_objects)
85
86
Please add for all of your functions a descriptive test in docs/API.rst.
87
88
Thanks.
89
"""
90
91
_marker = object()
92
93
UID_RX = re.compile("[a-z0-9]{32}$")
94
95
96
class APIError(Exception):
97
    """Base exception class for bika.lims errors."""
98
99
100
def get_portal():
101
    """Get the portal object
102
103
    :returns: Portal object
104
    """
105
    return ploneapi.portal.getSite()
106
107
108
def get_setup():
109
    """Fetch the `bika_setup` folder.
110
    """
111
    portal = get_portal()
112
    return portal.get("bika_setup")
113
114
115
def get_bika_setup():
116
    """Fetch the `bika_setup` folder.
117
    """
118
    return get_setup()
119
120
121
def create(container, portal_type, *args, **kwargs):
122
    """Creates an object in Bika LIMS
123
124
    This code uses most of the parts from the TypesTool
125
    see: `Products.CMFCore.TypesTool._constructInstance`
126
127
    :param container: container
128
    :type container: ATContentType/DexterityContentType/CatalogBrain
129
    :param portal_type: The portal type to create, e.g. "Client"
130
    :type portal_type: string
131
    :param title: The title for the new content object
132
    :type title: string
133
    :returns: The new created object
134
    """
135
    from bika.lims.utils import tmpID
136
137
    id = kwargs.pop("id", tmpID())
138
    title = kwargs.pop("title", "New {}".format(portal_type))
139
140
    # get the fti
141
    types_tool = get_tool("portal_types")
142
    fti = types_tool.getTypeInfo(portal_type)
143
144
    if fti.product:
145
        obj = _createObjectByType(portal_type, container, id)
146
        obj.processForm()
147
        obj.edit(title=title, **kwargs)
148
    else:
149
        # newstyle factory
150
        factory = getUtility(IFactory, fti.factory)
151
        obj = factory(id, *args, **kwargs)
152
        if hasattr(obj, '_setPortalTypeName'):
153
            obj._setPortalTypeName(fti.getId())
154
        # set the title
155
        obj.title = safe_unicode(title)
156
        # notify that the object was created
157
        notify(ObjectCreatedEvent(obj))
158
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and
159
        # ContainerModifiedEvent
160
        container._setObject(id, obj)
161
        # we get the object here with the current object id, as it might be
162
        # renamed already by an event handler
163
        obj = container._getOb(obj.getId())
164
165
    return obj
166
167
168
def get_tool(name, context=None, default=_marker):
169
    """Get a portal tool by name
170
171
    :param name: The name of the tool, e.g. `portal_catalog`
172
    :type name: string
173
    :param context: A portal object
174
    :type context: ATContentType/DexterityContentType/CatalogBrain
175
    :returns: Portal Tool
176
    """
177
178
    # Try first with the context
179
    if context is not None:
180
        try:
181
            context = get_object(context)
182
            return getToolByName(context, name)
183
        except (APIError, AttributeError) as e:
184
            # https://github.com/senaite/bika.lims/issues/396
185
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
186
                        "-> falling back to plone.api.portal.get_tool('{}')"
187
                        .format(repr(context), name, repr(e), name))
188
            return get_tool(name, default=default)
189
190
    # Try with the plone api
191
    try:
192
        return ploneapi.portal.get_tool(name)
193
    except InvalidParameterError:
194
        if default is not _marker:
195
            if isinstance(default, six.string_types):
196
                return get_tool(default)
197
            return default
198
        fail("No tool named '%s' found." % name)
199
200
201
def fail(msg=None):
202
    """API LIMS Error
203
    """
204
    if msg is None:
205
        msg = "Reason not given."
206
    raise APIError("{}".format(msg))
207
208
209
def is_object(brain_or_object):
210
    """Check if the passed in object is a supported portal content object
211
212
    :param brain_or_object: A single catalog brain or content object
213
    :type brain_or_object: Portal Object
214
    :returns: True if the passed in object is a valid portal content
215
    """
216
    if is_portal(brain_or_object):
217
        return True
218
    if is_supermodel(brain_or_object):
219
        return True
220
    if is_at_content(brain_or_object):
221
        return True
222
    if is_dexterity_content(brain_or_object):
223
        return True
224
    if is_brain(brain_or_object):
225
        return True
226
    return False
227
228
229
def get_object(brain_object_uid, default=_marker):
230
    """Get the full content object
231
232
    :param brain_object_uid: A catalog brain or content object or uid
233
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
234
    /CatalogBrain/basestring
235
    :returns: The full object
236
    """
237
    if is_uid(brain_object_uid):
238
        return get_object_by_uid(brain_object_uid)
239
    elif is_supermodel(brain_object_uid):
240
        return brain_object_uid.instance
241
    if not is_object(brain_object_uid):
242
        if default is _marker:
243
            fail("{} is not supported.".format(repr(brain_object_uid)))
244
        return default
245
    if is_brain(brain_object_uid):
246
        return brain_object_uid.getObject()
247
    return brain_object_uid
248
249
250
def is_portal(brain_or_object):
251
    """Checks if the passed in object is the portal root object
252
253
    :param brain_or_object: A single catalog brain or content object
254
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
255
    :returns: True if the object is the portal root object
256
    :rtype: bool
257
    """
258
    return ISiteRoot.providedBy(brain_or_object)
259
260
261
def is_brain(brain_or_object):
262
    """Checks if the passed in object is a portal catalog brain
263
264
    :param brain_or_object: A single catalog brain or content object
265
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
266
    :returns: True if the object is a catalog brain
267
    :rtype: bool
268
    """
269
    return ICatalogBrain.providedBy(brain_or_object)
270
271
272
def is_supermodel(brain_or_object):
273
    """Checks if the passed in object is a supermodel
274
275
    :param brain_or_object: A single catalog brain or content object
276
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
277
    :returns: True if the object is a catalog brain
278
    :rtype: bool
279
    """
280
    # avoid circular imports
281
    from senaite.app.supermodel.interfaces import ISuperModel
282
    return ISuperModel.providedBy(brain_or_object)
283
284
285
def is_dexterity_content(brain_or_object):
286
    """Checks if the passed in object is a dexterity content type
287
288
    :param brain_or_object: A single catalog brain or content object
289
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
290
    :returns: True if the object is a dexterity content type
291
    :rtype: bool
292
    """
293
    return IDexterityContent.providedBy(brain_or_object)
294
295
296
def is_at_content(brain_or_object):
297
    """Checks if the passed in object is an AT content type
298
299
    :param brain_or_object: A single catalog brain or content object
300
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
301
    :returns: True if the object is an AT content type
302
    :rtype: bool
303
    """
304
    return isinstance(brain_or_object, BaseObject)
305
306
307
def is_folderish(brain_or_object):
308
    """Checks if the passed in object is folderish
309
310
    :param brain_or_object: A single catalog brain or content object
311
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
312
    :returns: True if the object is folderish
313
    :rtype: bool
314
    """
315
    if hasattr(brain_or_object, "is_folderish"):
316
        if callable(brain_or_object.is_folderish):
317
            return brain_or_object.is_folderish()
318
        return brain_or_object.is_folderish
319
    return IFolderish.providedBy(get_object(brain_or_object))
320
321
322
def get_portal_type(brain_or_object):
323
    """Get the portal type for this object
324
325
    :param brain_or_object: A single catalog brain or content object
326
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
327
    :returns: Portal type
328
    :rtype: string
329
    """
330
    if not is_object(brain_or_object):
331
        fail("{} is not supported.".format(repr(brain_or_object)))
332
    return brain_or_object.portal_type
333
334
335
def get_schema(brain_or_object):
336
    """Get the schema of the content
337
338
    :param brain_or_object: A single catalog brain or content object
339
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
340
    :returns: Schema object
341
    """
342
    obj = get_object(brain_or_object)
343
    if is_portal(obj):
344
        fail("get_schema can't return schema of portal root")
345
    if is_dexterity_content(obj):
346
        pt = get_tool("portal_types")
347
        fti = pt.getTypeInfo(obj.portal_type)
348
        return fti.lookupSchema()
349
    if is_at_content(obj):
350
        return obj.Schema()
351
    fail("{} has no Schema.".format(brain_or_object))
352
353
354
def get_fields(brain_or_object):
355
    """Get a name to field mapping of the object
356
357
    :param brain_or_object: A single catalog brain or content object
358
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
359
    :returns: Mapping of name -> field
360
    :rtype: OrderedDict
361
    """
362
    obj = get_object(brain_or_object)
363
    schema = get_schema(obj)
364
    if is_dexterity_content(obj):
365
        # get the fields directly provided by the interface
366
        fields = getFieldsInOrder(schema)
367
        # append the fields coming from behaviors
368
        behavior_assignable = IBehaviorAssignable(obj)
369
        if behavior_assignable:
370
            behaviors = behavior_assignable.enumerateBehaviors()
371
            for behavior in behaviors:
372
                fields.extend(getFieldsInOrder(behavior.interface))
373
        return OrderedDict(fields)
374
    return OrderedDict(schema)
375
376
377
def get_id(brain_or_object):
378
    """Get the Plone ID for this object
379
380
    :param brain_or_object: A single catalog brain or content object
381
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
382
    :returns: Plone ID
383
    :rtype: string
384
    """
385
    if is_brain(brain_or_object):
386
        if base_hasattr(brain_or_object, "getId"):
387
            return brain_or_object.getId
388
        if base_hasattr(brain_or_object, "id"):
389
            return brain_or_object.id
390
    return get_object(brain_or_object).getId()
391
392
393
def get_title(brain_or_object):
394
    """Get the Title for this object
395
396
    :param brain_or_object: A single catalog brain or content object
397
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
398
    :returns: Title
399
    :rtype: string
400
    """
401
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
402
        return brain_or_object.Title
403
    return get_object(brain_or_object).Title()
404
405
406
def get_description(brain_or_object):
407
    """Get the Title for this object
408
409
    :param brain_or_object: A single catalog brain or content object
410
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
411
    :returns: Title
412
    :rtype: string
413
    """
414
    if is_brain(brain_or_object) \
415
            and base_hasattr(brain_or_object, "Description"):
416
        return brain_or_object.Description
417
    return get_object(brain_or_object).Description()
418
419
420
def get_uid(brain_or_object):
421
    """Get the Plone UID for this object
422
423
    :param brain_or_object: A single catalog brain or content object or an UID
424
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
425
    :returns: Plone UID
426
    :rtype: string
427
    """
428
    if is_uid(brain_or_object):
429
        return brain_or_object
430
    if is_portal(brain_or_object):
431
        return '0'
432
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
433
        return brain_or_object.UID
434
    return get_object(brain_or_object).UID()
435
436
437
def get_url(brain_or_object):
438
    """Get the absolute URL for this object
439
440
    :param brain_or_object: A single catalog brain or content object
441
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
442
    :returns: Absolute URL
443
    :rtype: string
444
    """
445
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
446
        return brain_or_object.getURL()
447
    return get_object(brain_or_object).absolute_url()
448
449
450
def get_icon(brain_or_object, html_tag=True):
451
    """Get the icon of the content object
452
453
    :param brain_or_object: A single catalog brain or content object
454
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
455
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
456
    :type html_tag: bool
457
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
458
    :rtype: string
459
    """
460
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
461
    # work for Contents coming from other catalogs than the
462
    # `portal_catalog`
463
    portal_types = get_tool("portal_types")
464
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
465
    icon = fti.getIcon()
466
    if not icon:
467
        return ""
468
    url = "%s/%s" % (get_url(get_portal()), icon)
469
    if not html_tag:
470
        return url
471
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
472
        url=url, title=get_title(brain_or_object))
473
    return tag
474
475
476
def get_object_by_uid(uid, default=_marker):
477
    """Find an object by a given UID
478
479
    :param uid: The UID of the object to find
480
    :type uid: string
481
    :returns: Found Object or None
482
    """
483
484
    # nothing to do here
485
    if not uid:
486
        if default is not _marker:
487
            return default
488
        fail("get_object_by_uid requires UID as first argument; got {} instead"
489
             .format(uid))
490
491
    # we defined the portal object UID to be '0'::
492
    if uid == '0':
493
        return get_portal()
494
495
    brain = get_brain_by_uid(uid)
496
497
    if brain is None:
498
        if default is not _marker:
499
            return default
500
        fail("No object found for UID {}".format(uid))
501
502
    return get_object(brain)
503
504
505
def get_brain_by_uid(uid, default=None):
506
    """Query a brain by a given UID
507
508
    :param uid: The UID of the object to find
509
    :type uid: string
510
    :returns: ZCatalog brain or None
511
    """
512
    if not is_uid(uid):
513
        return default
514
515
    # we try to find the object with the UID catalog
516
    uc = get_tool("uid_catalog")
517
518
    # try to find the object with the reference catalog first
519
    brains = uc(UID=uid)
520
    if len(brains) != 1:
521
        return default
522
    return brains[0]
523
524
525
def get_object_by_path(path, default=_marker):
526
    """Find an object by a given physical path or absolute_url
527
528
    :param path: The physical path of the object to find
529
    :type path: string
530
    :returns: Found Object or None
531
    """
532
533
    # nothing to do here
534
    if not path:
535
        if default is not _marker:
536
            return default
537
        fail("get_object_by_path first argument must be a path; {} received"
538
             .format(path))
539
540
    portal = get_portal()
541
    portal_path = get_path(portal)
542
    portal_url = get_url(portal)
543
544
    # ensure we have a physical path
545
    if path.startswith(portal_url):
546
        request = get_request()
547
        path = "/".join(request.physicalPathFromURL(path))
548
549
    if not path.startswith(portal_path):
550
        if default is not _marker:
551
            return default
552
        fail("Not a physical path inside the portal.")
553
554
    if path == portal_path:
555
        return portal
556
557
    return portal.restrictedTraverse(path, default)
558
559
560
def get_path(brain_or_object):
561
    """Calculate the physical path of this object
562
563
    :param brain_or_object: A single catalog brain or content object
564
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
565
    :returns: Physical path of the object
566
    :rtype: string
567
    """
568
    if is_brain(brain_or_object):
569
        return brain_or_object.getPath()
570
    return "/".join(get_object(brain_or_object).getPhysicalPath())
571
572
573
def get_parent_path(brain_or_object):
574
    """Calculate the physical parent path of this object
575
576
    :param brain_or_object: A single catalog brain or content object
577
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
578
    :returns: Physical path of the parent object
579
    :rtype: string
580
    """
581
    if is_portal(brain_or_object):
582
        return get_path(get_portal())
583
    if is_brain(brain_or_object):
584
        path = get_path(brain_or_object)
585
        return path.rpartition("/")[0]
586
    return get_path(get_object(brain_or_object).aq_parent)
587
588
589
def get_parent(brain_or_object, catalog_search=False):
590
    """Locate the parent object of the content/catalog brain
591
592
    The `catalog_search` switch uses the `portal_catalog` to do a search return
593
    a brain instead of the full parent object. However, if the search returned
594
    no results, it falls back to return the full parent object.
595
596
    :param brain_or_object: A single catalog brain or content object
597
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
598
    :param catalog_search: Use a catalog query to find the parent object
599
    :type catalog_search: bool
600
    :returns: parent object
601
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
602
    """
603
604
    if is_portal(brain_or_object):
605
        return get_portal()
606
607
    # Do a catalog search and return the brain
608
    if catalog_search:
609
        parent_path = get_parent_path(brain_or_object)
610
611
        # parent is the portal object
612
        if parent_path == get_path(get_portal()):
613
            return get_portal()
614
615
        # get the catalog tool
616
        pc = get_portal_catalog()
617
618
        # query for the parent path
619
        results = pc(path={
620
            "query": parent_path,
621
            "depth": 0})
622
623
        # No results fallback: return the parent object
624
        if not results:
625
            return get_object(brain_or_object).aq_parent
626
627
        # return the brain
628
        return results[0]
629
630
    return get_object(brain_or_object).aq_parent
631
632
633
def search(query, catalog=_marker):
634
    """Search for objects.
635
636
    :param query: A suitable search query.
637
    :type query: dict
638
    :param catalog: A single catalog id or a list of catalog ids
639
    :type catalog: str/list
640
    :returns: Search results
641
    :rtype: List of ZCatalog brains
642
    """
643
644
    # query needs to be a dictionary
645
    if not isinstance(query, dict):
646
        fail("Catalog query needs to be a dictionary")
647
648
    # Portal types to query
649
    portal_types = query.get("portal_type", [])
650
    # We want the portal_type as a list
651
    if not isinstance(portal_types, (tuple, list)):
652
        portal_types = [portal_types]
653
654
    # The catalogs used for the query
655
    catalogs = []
656
657
    # The user did **not** specify a catalog
658
    if catalog is _marker:
659
        # Find the registered catalogs for the queried portal types
660
        for portal_type in portal_types:
661
            # Just get the first registered/default catalog
662
            catalogs.append(get_catalogs_for(
663
                portal_type, default="portal_catalog")[0])
664
    else:
665
        # User defined catalogs
666
        if isinstance(catalog, (list, tuple)):
667
            catalogs.extend(map(get_tool, catalog))
668
        else:
669
            catalogs.append(get_tool(catalog))
670
671
    # Cleanup: Avoid duplicate catalogs
672
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
673
674
    # We only support **single** catalog queries
675
    if len(catalogs) > 1:
676
        fail("Multi Catalog Queries are not supported!")
677
678
    return catalogs[0](query)
679
680
681
def safe_getattr(brain_or_object, attr, default=_marker):
682
    """Return the attribute value
683
684
    :param brain_or_object: A single catalog brain or content object
685
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
686
    :param attr: Attribute name
687
    :type attr: str
688
    :returns: Attribute value
689
    :rtype: obj
690
    """
691
    try:
692
        value = getattr(brain_or_object, attr, _marker)
693
        if value is _marker:
694
            if default is not _marker:
695
                return default
696
            fail("Attribute '{}' not found.".format(attr))
697
        if callable(value):
698
            return value()
699
        return value
700
    except Unauthorized:
701
        if default is not _marker:
702
            return default
703
        fail("You are not authorized to access '{}' of '{}'.".format(
704
            attr, repr(brain_or_object)))
705
706
707
def get_portal_catalog():
708
    """Get the portal catalog tool
709
710
    :returns: Portal Catalog Tool
711
    """
712
    return get_tool("portal_catalog")
713
714
715
def get_review_history(brain_or_object, rev=True):
716
    """Get the review history for the given brain or context.
717
718
    :param brain_or_object: A single catalog brain or content object
719
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
720
    :returns: Workflow history
721
    :rtype: [{}, ...]
722
    """
723
    obj = get_object(brain_or_object)
724
    review_history = []
725
    try:
726
        workflow = get_tool("portal_workflow")
727
        review_history = workflow.getInfoFor(obj, 'review_history')
728
    except WorkflowException as e:
729
        message = str(e)
730
        logger.error("Cannot retrieve review_history on {}: {}".format(
731
            obj, message))
732
    if not isinstance(review_history, (list, tuple)):
733
        logger.error("get_review_history: expected list, received {}".format(
734
            review_history))
735
        review_history = []
736
737
    if isinstance(review_history, tuple):
738
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
739
        # tuple when "review_history" is passed in:
740
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
741
        #
742
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
743
        # Expression.StateChangeInfo, that always returns a list, except when no
744
        # review_history is found:
745
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
746
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
747
        review_history = list(review_history)
748
749
    if rev is True:
750
        review_history.reverse()
751
    return review_history
752
753
754
def get_revision_history(brain_or_object):
755
    """Get the revision history for the given brain or context.
756
757
    :param brain_or_object: A single catalog brain or content object
758
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
759
    :returns: Workflow history
760
    :rtype: obj
761
    """
762
    obj = get_object(brain_or_object)
763
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
764
    return chv.fullHistory()
765
766
767
def get_workflows_for(brain_or_object):
768
    """Get the assigned workflows for the given brain or context.
769
770
    Note: This function supports also the portal_type as parameter.
771
772
    :param brain_or_object: A single catalog brain or content object
773
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
774
    :returns: Assigned Workflows
775
    :rtype: tuple
776
    """
777
    workflow = ploneapi.portal.get_tool("portal_workflow")
778
    if isinstance(brain_or_object, six.string_types):
779
        return workflow.getChainFor(brain_or_object)
780
    obj = get_object(brain_or_object)
781
    return workflow.getChainFor(obj)
782
783
784
def get_workflow_status_of(brain_or_object, state_var="review_state"):
785
    """Get the current workflow status of the given brain or context.
786
787
    :param brain_or_object: A single catalog brain or content object
788
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
789
    :param state_var: The name of the state variable
790
    :type state_var: string
791
    :returns: Status
792
    :rtype: str
793
    """
794
    # Try to get the state from the catalog brain first
795
    if is_brain(brain_or_object):
796
        if state_var in brain_or_object.schema():
797
            return brain_or_object[state_var]
798
799
    # Retrieve the sate from the object
800
    workflow = get_tool("portal_workflow")
801
    obj = get_object(brain_or_object)
802
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
803
804
805
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
806
    """Get the previous workflow status of the object
807
808
    :param brain_or_object: A single catalog brain or content object
809
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
810
    :param skip: Workflow states to skip
811
    :type skip: tuple/list
812
    :returns: status
813
    :rtype: str
814
    """
815
816
    skip = isinstance(skip, (list, tuple)) and skip or []
817
    history = get_review_history(brain_or_object)
818
819
    # Remove consecutive duplicates, some transitions might happen more than
820
    # once consecutively (e.g. publish)
821
    history = map(lambda i: i[0], groupby(history))
822
823
    for num, item in enumerate(history):
824
        # skip the current history entry
825
        if num == 0:
826
            continue
827
        status = item.get("review_state")
828
        if status in skip:
829
            continue
830
        return status
831
    return default
832
833
834
def get_creation_date(brain_or_object):
835
    """Get the creation date of the brain or object
836
837
    :param brain_or_object: A single catalog brain or content object
838
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
839
    :returns: Creation date
840
    :rtype: DateTime
841
    """
842
    created = getattr(brain_or_object, "created", None)
843
    if created is None:
844
        fail("Object {} has no creation date ".format(
845
             repr(brain_or_object)))
846
    if callable(created):
847
        return created()
848
    return created
849
850
851
def get_modification_date(brain_or_object):
852
    """Get the modification date of the brain or object
853
854
    :param brain_or_object: A single catalog brain or content object
855
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
856
    :returns: Modification date
857
    :rtype: DateTime
858
    """
859
    modified = getattr(brain_or_object, "modified", None)
860
    if modified is None:
861
        fail("Object {} has no modification date ".format(
862
             repr(brain_or_object)))
863
    if callable(modified):
864
        return modified()
865
    return modified
866
867
868
def get_review_status(brain_or_object):
869
    """Get the `review_state` of an object
870
871
    :param brain_or_object: A single catalog brain or content object
872
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
873
    :returns: Value of the review_status variable
874
    :rtype: String
875
    """
876
    if is_brain(brain_or_object):
877
        return brain_or_object.review_state
878
    return get_workflow_status_of(brain_or_object, state_var="review_state")
879
880
881
def is_active(brain_or_object):
882
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
883
884
    :param brain_or_object: A single catalog brain or content object
885
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
886
    :returns: False if the object is in the state 'inactive' or 'cancelled'
887
    :rtype: bool
888
    """
889
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
890
        return False
891
    return True
892
893
894
def get_catalogs_for(brain_or_object, default="portal_catalog"):
895
    """Get all registered catalogs for the given portal_type, catalog brain or
896
    content object
897
898
    :param brain_or_object: The portal_type, a catalog brain or content object
899
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
900
    :returns: List of supported catalogs
901
    :rtype: list
902
    """
903
    archetype_tool = get_tool("archetype_tool", default=None)
904
    if archetype_tool is None:
905
        # return the default catalog
906
        return [get_tool(default, default="portal_catalog")]
907
908
    catalogs = []
909
910
    # get the registered catalogs for portal_type
911
    if is_object(brain_or_object):
912
        catalogs = archetype_tool.getCatalogsByType(
913
            get_portal_type(brain_or_object))
914
    if isinstance(brain_or_object, six.string_types):
915
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
916
917
    if not catalogs:
918
        return [get_tool(default)]
919
    return catalogs
920
921
922
def get_transitions_for(brain_or_object):
923
    """List available workflow transitions for all workflows
924
925
    :param brain_or_object: A single catalog brain or content object
926
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
927
    :returns: All possible available and allowed transitions
928
    :rtype: list[dict]
929
    """
930
    workflow = get_tool('portal_workflow')
931
    transitions = []
932
    instance = get_object(brain_or_object)
933
    for wfid in get_workflows_for(brain_or_object):
934
        wf = workflow[wfid]
935
        tlist = wf.getTransitionsFor(instance)
936
        transitions.extend([t for t in tlist if t not in transitions])
937
    return transitions
938
939
940
def do_transition_for(brain_or_object, transition):
941
    """Performs a workflow transition for the passed in object.
942
943
    :param brain_or_object: A single catalog brain or content object
944
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
945
    :returns: The object where the transtion was performed
946
    """
947
    if not isinstance(transition, six.string_types):
948
        fail("Transition type needs to be string, got '%s'" % type(transition))
949
    obj = get_object(brain_or_object)
950
    try:
951
        ploneapi.content.transition(obj, transition)
952
    except ploneapi.exc.InvalidParameterError as e:
953
        fail("Failed to perform transition '{}' on {}: {}".format(
954
             transition, obj, str(e)))
955
    return obj
956
957
958
def get_roles_for_permission(permission, brain_or_object):
959
    """Get a list of granted roles for the given permission on the object.
960
961
    :param brain_or_object: A single catalog brain or content object
962
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
963
    :returns: Roles for the given Permission
964
    :rtype: list
965
    """
966
    obj = get_object(brain_or_object)
967
    allowed = set(rolesForPermissionOn(permission, obj))
968
    return sorted(allowed)
969
970
971
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
972
    """Checks if the passed in object is versionable.
973
974
    :param brain_or_object: A single catalog brain or content object
975
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
976
    :returns: True if the object is versionable
977
    :rtype: bool
978
    """
979
    pr = get_tool("portal_repository")
980
    obj = get_object(brain_or_object)
981
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
982
        and pr.isVersionable(obj)
983
984
985
def get_version(brain_or_object):
986
    """Get the version of the current object
987
988
    :param brain_or_object: A single catalog brain or content object
989
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
990
    :returns: The current version of the object, or None if not available
991
    :rtype: int or None
992
    """
993
    obj = get_object(brain_or_object)
994
    if not is_versionable(obj):
995
        return None
996
    return getattr(aq_base(obj), "version_id", 0)
997
998
999
def get_view(name, context=None, request=None, default=None):
1000
    """Get the view by name
1001
1002
    :param name: The name of the view
1003
    :type name: str
1004
    :param context: The context to query the view
1005
    :type context: ATContentType/DexterityContentType/CatalogBrain
1006
    :param request: The request to query the view
1007
    :type request: HTTPRequest object
1008
    :returns: HTTP Request
1009
    :rtype: Products.Five.metaclass View object
1010
    """
1011
    context = context or get_portal()
1012
    request = request or get_request() or None
1013
    view = queryMultiAdapter((get_object(context), request), name=name)
1014
    if view is None:
1015
        return default
1016
    return view
1017
1018
1019
def get_request():
1020
    """Get the global request object
1021
1022
    :returns: HTTP Request
1023
    :rtype: HTTPRequest object
1024
    """
1025
    return globalrequest.getRequest()
1026
1027
1028
def get_test_request():
1029
    """Get the TestRequest object
1030
    """
1031
    request = TestRequest()
1032
    directlyProvides(request, IAttributeAnnotatable)
1033
    return request
1034
1035
1036
def get_group(group_or_groupname):
1037
    """Return Plone Group
1038
1039
    :param group_or_groupname: Plone group or the name of the group
1040
    :type groupname:  GroupData/str
1041
    :returns: Plone GroupData
1042
    """
1043
    if not group_or_groupname:
1044
1045
        return None
1046
    if hasattr(group_or_groupname, "_getGroup"):
1047
        return group_or_groupname
1048
    gtool = get_tool("portal_groups")
1049
    return gtool.getGroupById(group_or_groupname)
1050
1051
1052
def get_user(user_or_username):
1053
    """Return Plone User
1054
1055
    :param user_or_username: Plone user or user id
1056
    :returns: Plone MemberData
1057
    """
1058
    user = None
1059
    if isinstance(user_or_username, MemberData):
1060
        user = user_or_username
1061
    if isinstance(user_or_username, six.string_types):
1062
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1063
    return user
1064
1065
1066
def get_user_properties(user_or_username):
1067
    """Return User Properties
1068
1069
    :param user_or_username: Plone group identifier
1070
    :returns: Plone MemberData
1071
    """
1072
    user = get_user(user_or_username)
1073
    if user is None:
1074
        return {}
1075
    if not callable(user.getUser):
1076
        return {}
1077
    out = {}
1078
    plone_user = user.getUser()
1079
    for sheet in plone_user.listPropertysheets():
1080
        ps = plone_user.getPropertysheet(sheet)
1081
        out.update(dict(ps.propertyItems()))
1082
    return out
1083
1084
1085
def get_users_by_roles(roles=None):
1086
    """Search Plone users by their roles
1087
1088
    :param roles: Plone role name or list of roles
1089
    :type roles:  list/str
1090
    :returns: List of Plone users having the role(s)
1091
    """
1092
    if not isinstance(roles, (tuple, list)):
1093
        roles = [roles]
1094
    mtool = get_tool("portal_membership")
1095
    return mtool.searchForMembers(roles=roles)
1096
1097
1098
def get_current_user():
1099
    """Returns the current logged in user
1100
1101
    :returns: Current User
1102
    """
1103
    return ploneapi.user.get_current()
1104
1105
1106
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1107
    """Returns the associated contact of a Plone user
1108
1109
    If the user passed in has no contact associated, return None.
1110
    The `contact_types` parameter filter the portal types for the search.
1111
1112
    :param: Plone user
1113
    :contact_types: List with the contact portal types to search
1114
    :returns: Contact associated to the Plone user or None
1115
    """
1116
    if not user:
1117
        return None
1118
1119
    query = {'portal_type': contact_types, 'getUsername': user.id}
1120
    brains = search(query, catalog='portal_catalog')
1121
    if not brains:
1122
        return None
1123
1124
    if len(brains) > 1:
1125
        # Oops, the user has multiple contacts assigned, return None
1126
        contacts = map(lambda c: c.Title, brains)
1127
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1128
        err_msg = err_msg.format(user.id, ','.join(contacts))
1129
        logger.error(err_msg)
1130
        return None
1131
1132
    return get_object(brains[0])
1133
1134
1135
def get_user_client(user_or_contact):
1136
    """Returns the client of the contact of a Plone user
1137
1138
    If the user passed in has no contact or does not belong to any client,
1139
    returns None.
1140
1141
    :param: Plone user or contact
1142
    :returns: Client the contact of the Plone user belongs to
1143
    """
1144
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1145
        # Lab contacts cannot belong to a client
1146
        return None
1147
1148
    if not IContact.providedBy(user_or_contact):
1149
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1150
        if IContact.providedBy(contact):
1151
            return get_user_client(contact)
1152
        return None
1153
1154
    client = get_parent(user_or_contact)
1155
    if client and IClient.providedBy(client):
1156
        return client
1157
1158
    return None
1159
1160
1161
def get_current_client():
1162
    """Returns the current client the current logged in user belongs to, if any
1163
1164
    :returns: Client the current logged in user belongs to or None
1165
    """
1166
    return get_user_client(get_current_user())
1167
1168
1169
def get_cache_key(brain_or_object):
1170
    """Generate a cache key for a common brain or object
1171
1172
    :param brain_or_object: A single catalog brain or content object
1173
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1174
    :returns: Cache Key
1175
    :rtype: str
1176
    """
1177
    key = [
1178
        get_portal_type(brain_or_object),
1179
        get_id(brain_or_object),
1180
        get_uid(brain_or_object),
1181
        # handle different domains gracefully
1182
        get_url(brain_or_object),
1183
        # Return the microsecond since the epoch in GMT
1184
        get_modification_date(brain_or_object).micros(),
1185
    ]
1186
    return "-".join(map(lambda x: str(x), key))
1187
1188
1189
def bika_cache_key_decorator(method, self, brain_or_object):
1190
    """Bika cache key decorator usable for
1191
1192
    :param brain_or_object: A single catalog brain or content object
1193
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1194
    :returns: Cache Key
1195
    :rtype: str
1196
    """
1197
    if brain_or_object is None:
1198
        raise DontCache
1199
    return get_cache_key(brain_or_object)
1200
1201
1202
def normalize_id(string):
1203
    """Normalize the id
1204
1205
    :param string: A string to normalize
1206
    :type string: str
1207
    :returns: Normalized ID
1208
    :rtype: str
1209
    """
1210
    if not isinstance(string, six.string_types):
1211
        fail("Type of argument must be string, found '{}'"
1212
             .format(type(string)))
1213
    # get the id nomalizer utility
1214
    normalizer = getUtility(IIDNormalizer).normalize
1215
    return normalizer(string)
1216
1217
1218
def normalize_filename(string):
1219
    """Normalize the filename
1220
1221
    :param string: A string to normalize
1222
    :type string: str
1223
    :returns: Normalized ID
1224
    :rtype: str
1225
    """
1226
    if not isinstance(string, six.string_types):
1227
        fail("Type of argument must be string, found '{}'"
1228
             .format(type(string)))
1229
    # get the file nomalizer utility
1230
    normalizer = getUtility(IFileNameNormalizer).normalize
1231
    return normalizer(string)
1232
1233
1234
def is_uid(uid, validate=False):
1235
    """Checks if the passed in uid is a valid UID
1236
1237
    :param uid: The uid to check
1238
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1239
    True, also verifies if a brain exists for the uid passed in
1240
    :type uid: string
1241
    :return: True if a valid uid
1242
    :rtype: bool
1243
    """
1244
    if not isinstance(uid, six.string_types):
1245
        return False
1246
    if uid == '0':
1247
        return True
1248
    if len(uid) != 32:
1249
        return False
1250
    if not UID_RX.match(uid):
1251
        return False
1252
    if not validate:
1253
        return True
1254
1255
    # Check if a brain for this uid exists
1256
    uc = get_tool('uid_catalog')
1257
    brains = uc(UID=uid)
1258
    if brains:
1259
        assert (len(brains) == 1)
1260
    return len(brains) > 0
1261
1262
1263
def is_date(date):
1264
    """Checks if the passed in value is a valid Zope's DateTime
1265
1266
    :param date: The date to check
1267
    :type date: DateTime
1268
    :return: True if a valid date
1269
    :rtype: bool
1270
    """
1271
    if not date:
1272
        return False
1273
    return isinstance(date, (DateTime, datetime))
1274
1275
1276
def to_date(value, default=None):
1277
    """Tries to convert the passed in value to Zope's DateTime
1278
1279
    :param value: The value to be converted to a valid DateTime
1280
    :type value: str, DateTime or datetime
1281
    :return: The DateTime representation of the value passed in or default
1282
    """
1283
    if isinstance(value, DateTime):
1284
        return value
1285
    if not value:
1286
        if default is None:
1287
            return None
1288
        return to_date(default)
1289
    try:
1290
        if isinstance(value, str) and '.' in value:
1291
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1292
            return DateTime(value, datefmt='international')
1293
        return DateTime(value)
1294
    except (TypeError, ValueError, DateTimeError):
1295
        return to_date(default)
1296
1297
1298
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1299
               round_to_int=True):
1300
    """Returns the computed total number of minutes
1301
    """
1302
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1303
        float(seconds)/60 + float(milliseconds)/1000/60
1304
    return int(round(total)) if round_to_int else total
1305
1306
1307
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1308
    """Returns a representation of time in a string in xd yh zm format
1309
    """
1310
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1311
                         seconds=seconds, milliseconds=milliseconds)
1312
    delta = timedelta(minutes=int(round(minutes)))
1313
    d = delta.days
1314
    h = delta.seconds // 3600
1315
    m = (delta.seconds // 60) % 60
1316
    m = m and "{}m ".format(str(m)) or ""
1317
    d = d and "{}d ".format(str(d)) or ""
1318
    if m and d:
1319
        h = "{}h ".format(str(h))
1320
    else:
1321
        h = h and "{}h ".format(str(h)) or ""
1322
    return "".join([d, h, m]).strip()
1323
1324
1325
def to_int(value, default=_marker):
1326
    """Tries to convert the value to int.
1327
    Truncates at the decimal point if the value is a float
1328
1329
    :param value: The value to be converted to an int
1330
    :return: The resulting int or default
1331
    """
1332
    if is_floatable(value):
1333
        value = to_float(value)
1334
    try:
1335
        return int(value)
1336
    except (TypeError, ValueError):
1337
        if default is None:
1338
            return default
1339
        if default is not _marker:
1340
            return to_int(default)
1341
        fail("Value %s cannot be converted to int" % repr(value))
1342
1343
1344
def is_floatable(value):
1345
    """Checks if the passed in value is a valid floatable number
1346
1347
    :param value: The value to be evaluated as a float number
1348
    :type value: str, float, int
1349
    :returns: True if is a valid float number
1350
    :rtype: bool"""
1351
    try:
1352
        float(value)
1353
        return True
1354
    except (TypeError, ValueError):
1355
        return False
1356
1357
1358
def to_float(value, default=_marker):
1359
    """Converts the passed in value to a float number
1360
1361
    :param value: The value to be converted to a floatable number
1362
    :type value: str, float, int
1363
    :returns: The float number representation of the passed in value
1364
    :rtype: float
1365
    """
1366
    if not is_floatable(value):
1367
        if default is not _marker:
1368
            return to_float(default)
1369
        fail("Value %s is not floatable" % repr(value))
1370
    return float(value)
1371
1372
1373
def to_searchable_text_metadata(value):
1374
    """Parse the given metadata value to searchable text
1375
1376
    :param value: The raw value of the metadata column
1377
    :returns: Searchable and translated unicode value or None
1378
    """
1379
    if not value:
1380
        return u""
1381
    if value is Missing.Value:
1382
        return u""
1383
    if is_uid(value):
1384
        return u""
1385
    if isinstance(value, (bool)):
1386
        return u""
1387
    if isinstance(value, (list, tuple)):
1388
        values = map(to_searchable_text_metadata, value)
1389
        values = filter(None, values)
1390
        return " ".join(values)
1391
    if isinstance(value, dict):
1392
        return to_searchable_text_metadata(value.values())
1393
    if is_date(value):
1394
        return value.strftime("%Y-%m-%d")
1395
    if is_at_content(value):
1396
        return to_searchable_text_metadata(get_title(value))
1397
    if not isinstance(value, six.string_types):
1398
        value = str(value)
1399
    return safe_unicode(value)
1400
1401
1402
def get_registry_record(name, default=None):
1403
    """Returns the value of a registry record
1404
1405
    :param name: [required] name of the registry record
1406
    :type name: str
1407
    :param default: The value returned if the record is not found
1408
    :type default: anything
1409
    :returns: value of the registry record
1410
    """
1411
    return ploneapi.portal.get_registry_record(name, default=default)
1412
1413
1414
def to_display_list(pairs, sort_by="key", allow_empty=True):
1415
    """Create a Plone DisplayList from list items
1416
1417
    :param pairs: list of key, value pairs
1418
    :param sort_by: Sort the items either by key or value
1419
    :param allow_empty: Allow to select an empty value
1420
    :returns: Plone DisplayList
1421
    """
1422
    dl = DisplayList()
1423
1424
    if isinstance(pairs, six.string_types):
1425
        pairs = [pairs, pairs]
1426
    for pair in pairs:
1427
        # pairs is a list of lists -> add each pair
1428
        if isinstance(pair, (tuple, list)):
1429
            dl.add(*pair)
1430
        # pairs is just a single pair -> add it and stop
1431
        if isinstance(pair, six.string_types):
1432
            dl.add(*pairs)
1433
            break
1434
1435
    # add the empty option
1436
    if allow_empty:
1437
        dl.add("", "")
1438
1439
    # sort by key/value
1440
    if sort_by == "key":
1441
        dl = dl.sortedByKey()
1442
    elif sort_by == "value":
1443
        dl = dl.sortedByValue()
1444
1445
    return dl
1446
1447
1448
def text_to_html(text, wrap="p", encoding="utf8"):
1449
    """Convert `\n` sequences in the text to HTML `\n`
1450
1451
    :param text: Plain text to convert
1452
    :param wrap: Toggle to wrap the text in a
1453
    :returns: HTML converted and encoded text
1454
    """
1455
    if not text:
1456
        return ""
1457
    # handle text internally as unicode
1458
    text = safe_unicode(text)
1459
    # replace newline characters with HTML entities
1460
    html = text.replace("\n", "<br/>")
1461
    if wrap:
1462
        html = u"<{tag}>{html}</{tag}>".format(
1463
            tag=wrap, html=html)
1464
    # return encoded html
1465
    return html.encode(encoding)
1466
1467
1468
def to_utf8(string, default=_marker):
1469
    """Encode string to UTF8
1470
1471
    :param string: String to be encoded to UTF8
1472
    :returns: UTF8 encoded string
1473
    """
1474
    if not isinstance(string, six.string_types):
1475
        if default is _marker:
1476
            fail("Expected string type, got '%s'" % type(string))
1477
        return default
1478
    return safe_unicode(string).encode("utf8")
1479