Passed
Push — master ( 465f87...8b8f25 )
by Jordi
04:20
created

bika.lims.api.text_to_html()   A

Complexity

Conditions 3

Size

Total Lines 18
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 18
rs 9.95
c 0
b 0
f 0
cc 3
nop 3
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import re
22
from datetime import datetime
23
from datetime import timedelta
24
25
import Missing
26
from AccessControl.PermissionRole import rolesForPermissionOn
27
from Acquisition import aq_base
28
from bika.lims import logger
29
from bika.lims.interfaces import IClient
30
from bika.lims.interfaces import IContact
31
from bika.lims.interfaces import ILabContact
32
from DateTime import DateTime
33
from DateTime.interfaces import DateTimeError
34
from plone import api as ploneapi
35
from plone.api.exc import InvalidParameterError
36
from plone.app.layout.viewlets.content import ContentHistoryView
37
from plone.dexterity.interfaces import IDexterityContent
38
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
39
from plone.i18n.normalizer.interfaces import IIDNormalizer
40
from plone.memoize.volatile import DontCache
41
from Products.Archetypes.atapi import DisplayList
42
from Products.Archetypes.BaseObject import BaseObject
43
from Products.CMFCore.interfaces import IFolderish
44
from Products.CMFCore.interfaces import ISiteRoot
45
from Products.CMFCore.utils import getToolByName
46
from Products.CMFCore.WorkflowCore import WorkflowException
47
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
48
from Products.CMFPlone.utils import _createObjectByType
49
from Products.CMFPlone.utils import base_hasattr
50
from Products.CMFPlone.utils import safe_unicode
51
from Products.PlonePAS.tools.memberdata import MemberData
52
from Products.ZCatalog.interfaces import ICatalogBrain
53
from zope import globalrequest
54
from zope.component import getMultiAdapter
55
from zope.component import getUtility
56
from zope.component.interfaces import IFactory
57
from zope.event import notify
58
from zope.lifecycleevent import ObjectCreatedEvent
59
from zope.lifecycleevent import modified
60
from zope.security.interfaces import Unauthorized
61
62
"""SENAITE LIMS Framework API
63
64
Please see bika.lims/docs/API.rst for documentation.
65
66
Architecural Notes:
67
68
Please add only functions that do a single thing for a single object.
69
70
Good: `def get_foo(brain_or_object)`
71
Bad:  `def get_foos(list_of_brain_objects)`
72
73
Why?
74
75
Because it makes things more complex. You can always use a pattern like this to
76
achieve the same::
77
78
    >>> foos = map(get_foo, list_of_brain_objects)
79
80
Please add for all of your functions a descriptive test in docs/API.rst.
81
82
Thanks.
83
"""
84
85
_marker = object()
86
87
UID_RX = re.compile("[a-z0-9]{32}$")
88
89
90
class APIError(Exception):
91
    """Base exception class for bika.lims errors."""
92
93
94
def get_portal():
95
    """Get the portal object
96
97
    :returns: Portal object
98
    """
99
    return ploneapi.portal.getSite()
100
101
102
def get_setup():
103
    """Fetch the `bika_setup` folder.
104
    """
105
    portal = get_portal()
106
    return portal.get("bika_setup")
107
108
109
def get_bika_setup():
110
    """Fetch the `bika_setup` folder.
111
    """
112
    return get_setup()
113
114
115
def create(container, portal_type, *args, **kwargs):
116
    """Creates an object in Bika LIMS
117
118
    This code uses most of the parts from the TypesTool
119
    see: `Products.CMFCore.TypesTool._constructInstance`
120
121
    :param container: container
122
    :type container: ATContentType/DexterityContentType/CatalogBrain
123
    :param portal_type: The portal type to create, e.g. "Client"
124
    :type portal_type: string
125
    :param title: The title for the new content object
126
    :type title: string
127
    :returns: The new created object
128
    """
129
    from bika.lims.utils import tmpID
130
    if kwargs.get("title") is None:
131
        kwargs["title"] = "New {}".format(portal_type)
132
133
    # generate a temporary ID
134
    tmp_id = tmpID()
135
136
    # get the fti
137
    types_tool = get_tool("portal_types")
138
    fti = types_tool.getTypeInfo(portal_type)
139
140
    if fti.product:
141
        obj = _createObjectByType(portal_type, container, tmp_id)
142
    else:
143
        # newstyle factory
144
        factory = getUtility(IFactory, fti.factory)
145
        obj = factory(tmp_id, *args, **kwargs)
146
        if hasattr(obj, '_setPortalTypeName'):
147
            obj._setPortalTypeName(fti.getId())
148
        notify(ObjectCreatedEvent(obj))
149
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and
150
        # ContainerModifiedEvent
151
        container._setObject(tmp_id, obj)
152
        # we get the object here with the current object id, as it might be
153
        # renamed already by an event handler
154
        obj = container._getOb(obj.getId())
155
156
    # handle AT Content
157
    if is_at_content(obj):
158
        obj.processForm()
159
160
    # Edit after processForm; processForm does AT unmarkCreationFlag.
161
    obj.edit(**kwargs)
162
163
    # explicit notification
164
    modified(obj)
165
    return obj
166
167
168
def get_tool(name, context=None, default=_marker):
169
    """Get a portal tool by name
170
171
    :param name: The name of the tool, e.g. `portal_catalog`
172
    :type name: string
173
    :param context: A portal object
174
    :type context: ATContentType/DexterityContentType/CatalogBrain
175
    :returns: Portal Tool
176
    """
177
178
    # Try first with the context
179
    if context is not None:
180
        try:
181
            context = get_object(context)
182
            return getToolByName(context, name)
183
        except (APIError, AttributeError) as e:
184
            # https://github.com/senaite/bika.lims/issues/396
185
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
186
                        "-> falling back to plone.api.portal.get_tool('{}')"
187
                        .format(repr(context), name, repr(e), name))
188
            return get_tool(name, default=default)
189
190
    # Try with the plone api
191
    try:
192
        return ploneapi.portal.get_tool(name)
193
    except InvalidParameterError:
194
        if default is not _marker:
195
            return default
196
        fail("No tool named '%s' found." % name)
197
198
199
def fail(msg=None):
200
    """API LIMS Error
201
    """
202
    if msg is None:
203
        msg = "Reason not given."
204
    raise APIError("{}".format(msg))
205
206
207
def is_object(brain_or_object):
208
    """Check if the passed in object is a supported portal content object
209
210
    :param brain_or_object: A single catalog brain or content object
211
    :type brain_or_object: Portal Object
212
    :returns: True if the passed in object is a valid portal content
213
    """
214
    if is_portal(brain_or_object):
215
        return True
216
    if is_at_content(brain_or_object):
217
        return True
218
    if is_dexterity_content(brain_or_object):
219
        return True
220
    if is_brain(brain_or_object):
221
        return True
222
    return False
223
224
225
def get_object(brain_object_uid, default=_marker):
226
    """Get the full content object
227
228
    :param brain_object_uid: A catalog brain or content object or uid
229
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
230
    /CatalogBrain/basestring
231
    :returns: The full object
232
    """
233
    if is_uid(brain_object_uid):
234
        return get_object_by_uid(brain_object_uid)
235
    if not is_object(brain_object_uid):
236
        if default is _marker:
237
            fail("{} is not supported.".format(repr(brain_object_uid)))
238
        return default
239
    if is_brain(brain_object_uid):
240
        return brain_object_uid.getObject()
241
    return brain_object_uid
242
243
244
def is_portal(brain_or_object):
245
    """Checks if the passed in object is the portal root object
246
247
    :param brain_or_object: A single catalog brain or content object
248
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
249
    :returns: True if the object is the portal root object
250
    :rtype: bool
251
    """
252
    return ISiteRoot.providedBy(brain_or_object)
253
254
255
def is_brain(brain_or_object):
256
    """Checks if the passed in object is a portal catalog brain
257
258
    :param brain_or_object: A single catalog brain or content object
259
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
260
    :returns: True if the object is a catalog brain
261
    :rtype: bool
262
    """
263
    return ICatalogBrain.providedBy(brain_or_object)
264
265
266
def is_dexterity_content(brain_or_object):
267
    """Checks if the passed in object is a dexterity content type
268
269
    :param brain_or_object: A single catalog brain or content object
270
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
271
    :returns: True if the object is a dexterity content type
272
    :rtype: bool
273
    """
274
    return IDexterityContent.providedBy(brain_or_object)
275
276
277
def is_at_content(brain_or_object):
278
    """Checks if the passed in object is an AT content type
279
280
    :param brain_or_object: A single catalog brain or content object
281
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
282
    :returns: True if the object is an AT content type
283
    :rtype: bool
284
    """
285
    return isinstance(brain_or_object, BaseObject)
286
287
288
def is_folderish(brain_or_object):
289
    """Checks if the passed in object is folderish
290
291
    :param brain_or_object: A single catalog brain or content object
292
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
293
    :returns: True if the object is folderish
294
    :rtype: bool
295
    """
296
    if hasattr(brain_or_object, "is_folderish"):
297
        if callable(brain_or_object.is_folderish):
298
            return brain_or_object.is_folderish()
299
        return brain_or_object.is_folderish
300
    return IFolderish.providedBy(get_object(brain_or_object))
301
302
303
def get_portal_type(brain_or_object):
304
    """Get the portal type for this object
305
306
    :param brain_or_object: A single catalog brain or content object
307
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
308
    :returns: Portal type
309
    :rtype: string
310
    """
311
    if not is_object(brain_or_object):
312
        fail("{} is not supported.".format(repr(brain_or_object)))
313
    return brain_or_object.portal_type
314
315
316
def get_schema(brain_or_object):
317
    """Get the schema of the content
318
319
    :param brain_or_object: A single catalog brain or content object
320
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
321
    :returns: Schema object
322
    """
323
    obj = get_object(brain_or_object)
324
    if is_portal(obj):
325
        fail("get_schema can't return schema of portal root")
326
    if is_dexterity_content(obj):
327
        pt = get_tool("portal_types")
328
        fti = pt.getTypeInfo(obj.portal_type)
329
        return fti.lookupSchema()
330
    if is_at_content(obj):
331
        return obj.Schema()
332
    fail("{} has no Schema.".format(brain_or_object))
333
334
335
def get_fields(brain_or_object):
336
    """Get the list of fields from the object
337
338
    :param brain_or_object: A single catalog brain or content object
339
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
340
    :returns: List of fields
341
    :rtype: list
342
    """
343
    obj = get_object(brain_or_object)
344
    schema = get_schema(obj)
345
    if is_dexterity_content(obj):
346
        names = schema.names()
347
        fields = map(lambda name: schema.get(name), names)
348
        return dict(zip(names, fields))
349
    return dict(zip(schema.keys(), schema.fields()))
350
351
352
def get_id(brain_or_object):
353
    """Get the Plone ID for this object
354
355
    :param brain_or_object: A single catalog brain or content object
356
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
357
    :returns: Plone ID
358
    :rtype: string
359
    """
360
    if is_brain(brain_or_object):
361
        if base_hasattr(brain_or_object, "getId"):
362
            return brain_or_object.getId
363
        if base_hasattr(brain_or_object, "id"):
364
            return brain_or_object.id
365
    return get_object(brain_or_object).getId()
366
367
368
def get_title(brain_or_object):
369
    """Get the Title for this object
370
371
    :param brain_or_object: A single catalog brain or content object
372
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
373
    :returns: Title
374
    :rtype: string
375
    """
376
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
377
        return brain_or_object.Title
378
    return get_object(brain_or_object).Title()
379
380
381
def get_description(brain_or_object):
382
    """Get the Title for this object
383
384
    :param brain_or_object: A single catalog brain or content object
385
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
386
    :returns: Title
387
    :rtype: string
388
    """
389
    if is_brain(brain_or_object) \
390
            and base_hasattr(brain_or_object, "Description"):
391
        return brain_or_object.Description
392
    return get_object(brain_or_object).Description()
393
394
395
def get_uid(brain_or_object):
396
    """Get the Plone UID for this object
397
398
    :param brain_or_object: A single catalog brain or content object or an UID
399
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
400
    :returns: Plone UID
401
    :rtype: string
402
    """
403
    if is_uid(brain_or_object):
404
        return brain_or_object
405
    if is_portal(brain_or_object):
406
        return '0'
407
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
408
        return brain_or_object.UID
409
    return get_object(brain_or_object).UID()
410
411
412
def get_url(brain_or_object):
413
    """Get the absolute URL for this object
414
415
    :param brain_or_object: A single catalog brain or content object
416
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
417
    :returns: Absolute URL
418
    :rtype: string
419
    """
420
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
421
        return brain_or_object.getURL()
422
    return get_object(brain_or_object).absolute_url()
423
424
425
def get_icon(brain_or_object, html_tag=True):
426
    """Get the icon of the content object
427
428
    :param brain_or_object: A single catalog brain or content object
429
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
430
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
431
    :type html_tag: bool
432
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
433
    :rtype: string
434
    """
435
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
436
    # work for Contents coming from other catalogs than the
437
    # `portal_catalog`
438
    portal_types = get_tool("portal_types")
439
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
440
    icon = fti.getIcon()
441
    if not icon:
442
        return ""
443
    url = "%s/%s" % (get_url(get_portal()), icon)
444
    if not html_tag:
445
        return url
446
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
447
        url=url, title=get_title(brain_or_object))
448
    return tag
449
450
451
def get_object_by_uid(uid, default=_marker):
452
    """Find an object by a given UID
453
454
    :param uid: The UID of the object to find
455
    :type uid: string
456
    :returns: Found Object or None
457
    """
458
459
    # nothing to do here
460
    if not uid:
461
        if default is not _marker:
462
            return default
463
        fail("get_object_by_uid requires UID as first argument; got {} instead"
464
             .format(uid))
465
466
    # we defined the portal object UID to be '0'::
467
    if uid == '0':
468
        return get_portal()
469
470
    brain = get_brain_by_uid(uid)
471
472
    if brain is None:
473
        if default is not _marker:
474
            return default
475
        fail("No object found for UID {}".format(uid))
476
477
    return get_object(brain)
478
479
480
def get_brain_by_uid(uid, default=None):
481
    """Query a brain by a given UID
482
483
    :param uid: The UID of the object to find
484
    :type uid: string
485
    :returns: ZCatalog brain or None
486
    """
487
    if not is_uid(uid):
488
        return default
489
490
    # we try to find the object with the UID catalog
491
    uc = get_tool("uid_catalog")
492
493
    # try to find the object with the reference catalog first
494
    brains = uc(UID=uid)
495
    if len(brains) != 1:
496
        return default
497
    return brains[0]
498
499
500
def get_object_by_path(path, default=_marker):
501
    """Find an object by a given physical path or absolute_url
502
503
    :param path: The physical path of the object to find
504
    :type path: string
505
    :returns: Found Object or None
506
    """
507
508
    # nothing to do here
509
    if not path:
510
        if default is not _marker:
511
            return default
512
        fail("get_object_by_path first argument must be a path; {} received"
513
             .format(path))
514
515
    portal = get_portal()
516
    portal_path = get_path(portal)
517
    portal_url = get_url(portal)
518
519
    # ensure we have a physical path
520
    if path.startswith(portal_url):
521
        request = get_request()
522
        path = "/".join(request.physicalPathFromURL(path))
523
524
    if not path.startswith(portal_path):
525
        if default is not _marker:
526
            return default
527
        fail("Not a physical path inside the portal.")
528
529
    if path == portal_path:
530
        return portal
531
532
    return portal.restrictedTraverse(path, default)
533
534
535
def get_path(brain_or_object):
536
    """Calculate the physical path of this object
537
538
    :param brain_or_object: A single catalog brain or content object
539
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
540
    :returns: Physical path of the object
541
    :rtype: string
542
    """
543
    if is_brain(brain_or_object):
544
        return brain_or_object.getPath()
545
    return "/".join(get_object(brain_or_object).getPhysicalPath())
546
547
548
def get_parent_path(brain_or_object):
549
    """Calculate the physical parent path of this object
550
551
    :param brain_or_object: A single catalog brain or content object
552
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
553
    :returns: Physical path of the parent object
554
    :rtype: string
555
    """
556
    if is_portal(brain_or_object):
557
        return get_path(get_portal())
558
    if is_brain(brain_or_object):
559
        path = get_path(brain_or_object)
560
        return path.rpartition("/")[0]
561
    return get_path(get_object(brain_or_object).aq_parent)
562
563
564
def get_parent(brain_or_object, catalog_search=False):
565
    """Locate the parent object of the content/catalog brain
566
567
    The `catalog_search` switch uses the `portal_catalog` to do a search return
568
    a brain instead of the full parent object. However, if the search returned
569
    no results, it falls back to return the full parent object.
570
571
    :param brain_or_object: A single catalog brain or content object
572
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
573
    :param catalog_search: Use a catalog query to find the parent object
574
    :type catalog_search: bool
575
    :returns: parent object
576
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
577
    """
578
579
    if is_portal(brain_or_object):
580
        return get_portal()
581
582
    # Do a catalog search and return the brain
583
    if catalog_search:
584
        parent_path = get_parent_path(brain_or_object)
585
586
        # parent is the portal object
587
        if parent_path == get_path(get_portal()):
588
            return get_portal()
589
590
        # get the catalog tool
591
        pc = get_portal_catalog()
592
593
        # query for the parent path
594
        results = pc(path={
595
            "query": parent_path,
596
            "depth": 0})
597
598
        # No results fallback: return the parent object
599
        if not results:
600
            return get_object(brain_or_object).aq_parent
601
602
        # return the brain
603
        return results[0]
604
605
    return get_object(brain_or_object).aq_parent
606
607
608
def search(query, catalog=_marker):
609
    """Search for objects.
610
611
    :param query: A suitable search query.
612
    :type query: dict
613
    :param catalog: A single catalog id or a list of catalog ids
614
    :type catalog: str/list
615
    :returns: Search results
616
    :rtype: List of ZCatalog brains
617
    """
618
619
    # query needs to be a dictionary
620
    if not isinstance(query, dict):
621
        fail("Catalog query needs to be a dictionary")
622
623
    # Portal types to query
624
    portal_types = query.get("portal_type", [])
625
    # We want the portal_type as a list
626
    if not isinstance(portal_types, (tuple, list)):
627
        portal_types = [portal_types]
628
629
    # The catalogs used for the query
630
    catalogs = []
631
632
    # The user did **not** specify a catalog
633
    if catalog is _marker:
634
        # Find the registered catalogs for the queried portal types
635
        for portal_type in portal_types:
636
            # Just get the first registered/default catalog
637
            catalogs.append(get_catalogs_for(
638
                portal_type, default="portal_catalog")[0])
639
    else:
640
        # User defined catalogs
641
        if isinstance(catalog, (list, tuple)):
642
            catalogs.extend(map(get_tool, catalog))
643
        else:
644
            catalogs.append(get_tool(catalog))
645
646
    # Cleanup: Avoid duplicate catalogs
647
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
648
649
    # We only support **single** catalog queries
650
    if len(catalogs) > 1:
651
        fail("Multi Catalog Queries are not supported!")
652
653
    return catalogs[0](query)
654
655
656
def safe_getattr(brain_or_object, attr, default=_marker):
657
    """Return the attribute value
658
659
    :param brain_or_object: A single catalog brain or content object
660
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
661
    :param attr: Attribute name
662
    :type attr: str
663
    :returns: Attribute value
664
    :rtype: obj
665
    """
666
    try:
667
        value = getattr(brain_or_object, attr, _marker)
668
        if value is _marker:
669
            if default is not _marker:
670
                return default
671
            fail("Attribute '{}' not found.".format(attr))
672
        if callable(value):
673
            return value()
674
        return value
675
    except Unauthorized:
676
        if default is not _marker:
677
            return default
678
        fail("You are not authorized to access '{}' of '{}'.".format(
679
            attr, repr(brain_or_object)))
680
681
682
def get_portal_catalog():
683
    """Get the portal catalog tool
684
685
    :returns: Portal Catalog Tool
686
    """
687
    return get_tool("portal_catalog")
688
689
690
def get_review_history(brain_or_object, rev=True):
691
    """Get the review history for the given brain or context.
692
693
    :param brain_or_object: A single catalog brain or content object
694
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
695
    :returns: Workflow history
696
    :rtype: [{}, ...]
697
    """
698
    obj = get_object(brain_or_object)
699
    review_history = []
700
    try:
701
        workflow = get_tool("portal_workflow")
702
        review_history = workflow.getInfoFor(obj, 'review_history')
703
    except WorkflowException as e:
704
        message = str(e)
705
        logger.error("Cannot retrieve review_history on {}: {}".format(
706
            obj, message))
707
    if not isinstance(review_history, (list, tuple)):
708
        logger.error("get_review_history: expected list, received {}".format(
709
            review_history))
710
        review_history = []
711
712
    if isinstance(review_history, tuple):
713
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
714
        # tuple when "review_history" is passed in:
715
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
716
        #
717
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
718
        # Expression.StateChangeInfo, that always returns a list, except when no
719
        # review_history is found:
720
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
721
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
722
        review_history = list(review_history)
723
724
    if rev is True:
725
        review_history.reverse()
726
    return review_history
727
728
729
def get_revision_history(brain_or_object):
730
    """Get the revision history for the given brain or context.
731
732
    :param brain_or_object: A single catalog brain or content object
733
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
734
    :returns: Workflow history
735
    :rtype: obj
736
    """
737
    obj = get_object(brain_or_object)
738
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
739
    return chv.fullHistory()
740
741
742
def get_workflows_for(brain_or_object):
743
    """Get the assigned workflows for the given brain or context.
744
745
    Note: This function supports also the portal_type as parameter.
746
747
    :param brain_or_object: A single catalog brain or content object
748
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
749
    :returns: Assigned Workflows
750
    :rtype: tuple
751
    """
752
    workflow = ploneapi.portal.get_tool("portal_workflow")
753
    if isinstance(brain_or_object, basestring):
754
        return workflow.getChainFor(brain_or_object)
755
    obj = get_object(brain_or_object)
756
    return workflow.getChainFor(obj)
757
758
759
def get_workflow_status_of(brain_or_object, state_var="review_state"):
760
    """Get the current workflow status of the given brain or context.
761
762
    :param brain_or_object: A single catalog brain or content object
763
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
764
    :param state_var: The name of the state variable
765
    :type state_var: string
766
    :returns: Status
767
    :rtype: str
768
    """
769
    # Try to get the state from the catalog brain first
770
    if is_brain(brain_or_object):
771
        if state_var in brain_or_object.schema():
772
            return brain_or_object[state_var]
773
774
    # Retrieve the sate from the object
775
    workflow = get_tool("portal_workflow")
776
    obj = get_object(brain_or_object)
777
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
778
779
780
def get_creation_date(brain_or_object):
781
    """Get the creation date of the brain or object
782
783
    :param brain_or_object: A single catalog brain or content object
784
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
785
    :returns: Creation date
786
    :rtype: DateTime
787
    """
788
    created = getattr(brain_or_object, "created", None)
789
    if created is None:
790
        fail("Object {} has no creation date ".format(
791
             repr(brain_or_object)))
792
    if callable(created):
793
        return created()
794
    return created
795
796
797
def get_modification_date(brain_or_object):
798
    """Get the modification date of the brain or object
799
800
    :param brain_or_object: A single catalog brain or content object
801
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
802
    :returns: Modification date
803
    :rtype: DateTime
804
    """
805
    modified = getattr(brain_or_object, "modified", None)
806
    if modified is None:
807
        fail("Object {} has no modification date ".format(
808
             repr(brain_or_object)))
809
    if callable(modified):
810
        return modified()
811
    return modified
812
813
814
def get_review_status(brain_or_object):
815
    """Get the `review_state` of an object
816
817
    :param brain_or_object: A single catalog brain or content object
818
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
819
    :returns: Value of the review_status variable
820
    :rtype: String
821
    """
822
    if is_brain(brain_or_object):
823
        return brain_or_object.review_state
824
    return get_workflow_status_of(brain_or_object, state_var="review_state")
825
826
827
def is_active(brain_or_object):
828
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
829
830
    :param brain_or_object: A single catalog brain or content object
831
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
832
    :returns: False if the object is in the state 'inactive' or 'cancelled'
833
    :rtype: bool
834
    """
835
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
836
        return False
837
    return True
838
839
840
def get_catalogs_for(brain_or_object, default="portal_catalog"):
841
    """Get all registered catalogs for the given portal_type, catalog brain or
842
    content object
843
844
    :param brain_or_object: The portal_type, a catalog brain or content object
845
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
846
    :returns: List of supported catalogs
847
    :rtype: list
848
    """
849
    archetype_tool = get_tool("archetype_tool", None)
850
    if not archetype_tool:
851
        # return the default catalog
852
        return [get_tool(default)]
853
854
    catalogs = []
855
856
    # get the registered catalogs for portal_type
857
    if is_object(brain_or_object):
858
        catalogs = archetype_tool.getCatalogsByType(
859
            get_portal_type(brain_or_object))
860
    if isinstance(brain_or_object, basestring):
861
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
862
863
    if not catalogs:
864
        return [get_tool(default)]
865
    return catalogs
866
867
868
def get_transitions_for(brain_or_object):
869
    """List available workflow transitions for all workflows
870
871
    :param brain_or_object: A single catalog brain or content object
872
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
873
    :returns: All possible available and allowed transitions
874
    :rtype: list[dict]
875
    """
876
    workflow = get_tool('portal_workflow')
877
    transitions = []
878
    instance = get_object(brain_or_object)
879
    for wfid in get_workflows_for(brain_or_object):
880
        wf = workflow[wfid]
881
        tlist = wf.getTransitionsFor(instance)
882
        transitions.extend([t for t in tlist if t not in transitions])
883
    return transitions
884
885
886
def do_transition_for(brain_or_object, transition):
887
    """Performs a workflow transition for the passed in object.
888
889
    :param brain_or_object: A single catalog brain or content object
890
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
891
    :returns: The object where the transtion was performed
892
    """
893
    if not isinstance(transition, basestring):
894
        fail("Transition type needs to be string, got '%s'" % type(transition))
895
    obj = get_object(brain_or_object)
896
    try:
897
        ploneapi.content.transition(obj, transition)
898
    except ploneapi.exc.InvalidParameterError as e:
899
        fail("Failed to perform transition '{}' on {}: {}".format(
900
             transition, obj, str(e)))
901
    return obj
902
903
904
def get_roles_for_permission(permission, brain_or_object):
905
    """Get a list of granted roles for the given permission on the object.
906
907
    :param brain_or_object: A single catalog brain or content object
908
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
909
    :returns: Roles for the given Permission
910
    :rtype: list
911
    """
912
    obj = get_object(brain_or_object)
913
    allowed = set(rolesForPermissionOn(permission, obj))
914
    return sorted(allowed)
915
916
917
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
918
    """Checks if the passed in object is versionable.
919
920
    :param brain_or_object: A single catalog brain or content object
921
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
922
    :returns: True if the object is versionable
923
    :rtype: bool
924
    """
925
    pr = get_tool("portal_repository")
926
    obj = get_object(brain_or_object)
927
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
928
        and pr.isVersionable(obj)
929
930
931
def get_version(brain_or_object):
932
    """Get the version of the current object
933
934
    :param brain_or_object: A single catalog brain or content object
935
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
936
    :returns: The current version of the object, or None if not available
937
    :rtype: int or None
938
    """
939
    obj = get_object(brain_or_object)
940
    if not is_versionable(obj):
941
        return None
942
    return getattr(aq_base(obj), "version_id", 0)
943
944
945
def get_view(name, context=None, request=None):
946
    """Get the view by name
947
948
    :param name: The name of the view
949
    :type name: str
950
    :param context: The context to query the view
951
    :type context: ATContentType/DexterityContentType/CatalogBrain
952
    :param request: The request to query the view
953
    :type request: HTTPRequest object
954
    :returns: HTTP Request
955
    :rtype: Products.Five.metaclass View object
956
    """
957
    context = context or get_portal()
958
    request = request or get_request() or None
959
    return getMultiAdapter((get_object(context), request), name=name)
960
961
962
def get_request():
963
    """Get the global request object
964
965
    :returns: HTTP Request
966
    :rtype: HTTPRequest object
967
    """
968
    return globalrequest.getRequest()
969
970
971
def get_group(group_or_groupname):
972
    """Return Plone Group
973
974
    :param group_or_groupname: Plone group or the name of the group
975
    :type groupname:  GroupData/str
976
    :returns: Plone GroupData
977
    """
978
    if not group_or_groupname:
979
980
        return None
981
    if hasattr(group_or_groupname, "_getGroup"):
982
        return group_or_groupname
983
    gtool = get_tool("portal_groups")
984
    return gtool.getGroupById(group_or_groupname)
985
986
987
def get_user(user_or_username):
988
    """Return Plone User
989
990
    :param user_or_username: Plone user or user id
991
    :returns: Plone MemberData
992
    """
993
    user = None
994
    if isinstance(user_or_username, MemberData):
995
        user = user_or_username
996
    if isinstance(user_or_username, basestring):
997
        user = get_member_by_login_name(get_portal(), user_or_username, False)
998
    return user
999
1000
1001
def get_user_properties(user_or_username):
1002
    """Return User Properties
1003
1004
    :param user_or_username: Plone group identifier
1005
    :returns: Plone MemberData
1006
    """
1007
    user = get_user(user_or_username)
1008
    if user is None:
1009
        return {}
1010
    if not callable(user.getUser):
1011
        return {}
1012
    out = {}
1013
    plone_user = user.getUser()
1014
    for sheet in plone_user.listPropertysheets():
1015
        ps = plone_user.getPropertysheet(sheet)
1016
        out.update(dict(ps.propertyItems()))
1017
    return out
1018
1019
1020
def get_users_by_roles(roles=None):
1021
    """Search Plone users by their roles
1022
1023
    :param roles: Plone role name or list of roles
1024
    :type roles:  list/str
1025
    :returns: List of Plone users having the role(s)
1026
    """
1027
    if not isinstance(roles, (tuple, list)):
1028
        roles = [roles]
1029
    mtool = get_tool("portal_membership")
1030
    return mtool.searchForMembers(roles=roles)
1031
1032
1033
def get_current_user():
1034
    """Returns the current logged in user
1035
1036
    :returns: Current User
1037
    """
1038
    return ploneapi.user.get_current()
1039
1040
1041
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1042
    """Returns the associated contact of a Plone user
1043
1044
    If the user passed in has no contact associated, return None.
1045
    The `contact_types` parameter filter the portal types for the search.
1046
1047
    :param: Plone user
1048
    :contact_types: List with the contact portal types to search
1049
    :returns: Contact associated to the Plone user or None
1050
    """
1051
    if not user:
1052
        return None
1053
1054
    query = {'portal_type': contact_types, 'getUsername': user.id}
1055
    brains = search(query, catalog='portal_catalog')
1056
    if not brains:
1057
        return None
1058
1059
    if len(brains) > 1:
1060
        # Oops, the user has multiple contacts assigned, return None
1061
        contacts = map(lambda c: c.Title, brains)
1062
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1063
        err_msg = err_msg.format(user.id, ','.join(contacts))
1064
        logger.error(err_msg)
1065
        return None
1066
1067
    return get_object(brains[0])
1068
1069
1070
def get_user_client(user_or_contact):
1071
    """Returns the client of the contact of a Plone user
1072
1073
    If the user passed in has no contact or does not belong to any client,
1074
    returns None.
1075
1076
    :param: Plone user or contact
1077
    :returns: Client the contact of the Plone user belongs to
1078
    """
1079
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1080
        # Lab contacts cannot belong to a client
1081
        return None
1082
1083
    if not IContact.providedBy(user_or_contact):
1084
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1085
        if IContact.providedBy(contact):
1086
            return get_user_client(contact)
1087
        return None
1088
1089
    client = get_parent(user_or_contact)
1090
    if client and IClient.providedBy(client):
1091
        return client
1092
1093
    return None
1094
1095
1096
def get_current_client():
1097
    """Returns the current client the current logged in user belongs to, if any
1098
1099
    :returns: Client the current logged in user belongs to or None
1100
    """
1101
    return get_user_client(get_current_user())
1102
1103
1104
def get_cache_key(brain_or_object):
1105
    """Generate a cache key for a common brain or object
1106
1107
    :param brain_or_object: A single catalog brain or content object
1108
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1109
    :returns: Cache Key
1110
    :rtype: str
1111
    """
1112
    key = [
1113
        get_portal_type(brain_or_object),
1114
        get_id(brain_or_object),
1115
        get_uid(brain_or_object),
1116
        # handle different domains gracefully
1117
        get_url(brain_or_object),
1118
        # Return the microsecond since the epoch in GMT
1119
        get_modification_date(brain_or_object).micros(),
1120
    ]
1121
    return "-".join(map(lambda x: str(x), key))
1122
1123
1124
def bika_cache_key_decorator(method, self, brain_or_object):
1125
    """Bika cache key decorator usable for
1126
1127
    :param brain_or_object: A single catalog brain or content object
1128
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1129
    :returns: Cache Key
1130
    :rtype: str
1131
    """
1132
    if brain_or_object is None:
1133
        raise DontCache
1134
    return get_cache_key(brain_or_object)
1135
1136
1137
def normalize_id(string):
1138
    """Normalize the id
1139
1140
    :param string: A string to normalize
1141
    :type string: str
1142
    :returns: Normalized ID
1143
    :rtype: str
1144
    """
1145
    if not isinstance(string, basestring):
1146
        fail("Type of argument must be string, found '{}'"
1147
             .format(type(string)))
1148
    # get the id nomalizer utility
1149
    normalizer = getUtility(IIDNormalizer).normalize
1150
    return normalizer(string)
1151
1152
1153
def normalize_filename(string):
1154
    """Normalize the filename
1155
1156
    :param string: A string to normalize
1157
    :type string: str
1158
    :returns: Normalized ID
1159
    :rtype: str
1160
    """
1161
    if not isinstance(string, basestring):
1162
        fail("Type of argument must be string, found '{}'"
1163
             .format(type(string)))
1164
    # get the file nomalizer utility
1165
    normalizer = getUtility(IFileNameNormalizer).normalize
1166
    return normalizer(string)
1167
1168
1169
def is_uid(uid, validate=False):
1170
    """Checks if the passed in uid is a valid UID
1171
1172
    :param uid: The uid to check
1173
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1174
    True, also verifies if a brain exists for the uid passed in
1175
    :type uid: string
1176
    :return: True if a valid uid
1177
    :rtype: bool
1178
    """
1179
    if not isinstance(uid, basestring):
1180
        return False
1181
    if uid == '0':
1182
        return True
1183
    if len(uid) != 32:
1184
        return False
1185
    if not UID_RX.match(uid):
1186
        return False
1187
    if not validate:
1188
        return True
1189
1190
    # Check if a brain for this uid exists
1191
    uc = get_tool('uid_catalog')
1192
    brains = uc(UID=uid)
1193
    if brains:
1194
        assert (len(brains) == 1)
1195
    return len(brains) > 0
1196
1197
1198
def is_date(date):
1199
    """Checks if the passed in value is a valid Zope's DateTime
1200
1201
    :param date: The date to check
1202
    :type date: DateTime
1203
    :return: True if a valid date
1204
    :rtype: bool
1205
    """
1206
    if not date:
1207
        return False
1208
    return isinstance(date, (DateTime, datetime))
1209
1210
1211
def to_date(value, default=None):
1212
    """Tries to convert the passed in value to Zope's DateTime
1213
1214
    :param value: The value to be converted to a valid DateTime
1215
    :type value: str, DateTime or datetime
1216
    :return: The DateTime representation of the value passed in or default
1217
    """
1218
    if isinstance(value, DateTime):
1219
        return value
1220
    if not value:
1221
        if default is None:
1222
            return None
1223
        return to_date(default)
1224
    try:
1225
        if isinstance(value, str) and '.' in value:
1226
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1227
            return DateTime(value, datefmt='international')
1228
        return DateTime(value)
1229
    except (TypeError, ValueError, DateTimeError):
1230
        return to_date(default)
1231
1232
1233
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1234
               round_to_int=True):
1235
    """Returns the computed total number of minutes
1236
    """
1237
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1238
        float(seconds)/60 + float(milliseconds)/1000/60
1239
    return int(round(total)) if round_to_int else total
1240
1241
1242
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1243
    """Returns a representation of time in a string in xd yh zm format
1244
    """
1245
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1246
                         seconds=seconds, milliseconds=milliseconds)
1247
    delta = timedelta(minutes=int(round(minutes)))
1248
    d = delta.days
1249
    h = delta.seconds // 3600
1250
    m = (delta.seconds // 60) % 60
1251
    m = m and "{}m ".format(str(m)) or ""
1252
    d = d and "{}d ".format(str(d)) or ""
1253
    if m and d:
1254
        h = "{}h ".format(str(h))
1255
    else:
1256
        h = h and "{}h ".format(str(h)) or ""
1257
    return "".join([d, h, m]).strip()
1258
1259
1260
def to_int(value, default=_marker):
1261
    """Tries to convert the value to int.
1262
    Truncates at the decimal point if the value is a float
1263
1264
    :param value: The value to be converted to an int
1265
    :return: The resulting int or default
1266
    """
1267
    if is_floatable(value):
1268
        value = to_float(value)
1269
    try:
1270
        return int(value)
1271
    except (TypeError, ValueError):
1272
        if default is None:
1273
            return default
1274
        if default is not _marker:
1275
            return to_int(default)
1276
        fail("Value %s cannot be converted to int" % repr(value))
1277
1278
1279
def is_floatable(value):
1280
    """Checks if the passed in value is a valid floatable number
1281
1282
    :param value: The value to be evaluated as a float number
1283
    :type value: str, float, int
1284
    :returns: True if is a valid float number
1285
    :rtype: bool"""
1286
    try:
1287
        float(value)
1288
        return True
1289
    except (TypeError, ValueError):
1290
        return False
1291
1292
1293
def to_float(value, default=_marker):
1294
    """Converts the passed in value to a float number
1295
1296
    :param value: The value to be converted to a floatable number
1297
    :type value: str, float, int
1298
    :returns: The float number representation of the passed in value
1299
    :rtype: float
1300
    """
1301
    if not is_floatable(value):
1302
        if default is not _marker:
1303
            return to_float(default)
1304
        fail("Value %s is not floatable" % repr(value))
1305
    return float(value)
1306
1307
1308
def to_searchable_text_metadata(value):
1309
    """Parse the given metadata value to searchable text
1310
1311
    :param value: The raw value of the metadata column
1312
    :returns: Searchable and translated unicode value or None
1313
    """
1314
    if not value:
1315
        return u""
1316
    if value is Missing.Value:
1317
        return u""
1318
    if is_uid(value):
1319
        return u""
1320
    if isinstance(value, (bool)):
1321
        return u""
1322
    if isinstance(value, (list, tuple)):
1323
        for v in value:
1324
            return to_searchable_text_metadata(v)
1325
    if isinstance(value, (dict)):
1326
        for k, v in value.items():
1327
            return to_searchable_text_metadata(v)
1328
    if is_date(value):
1329
        return value.strftime("%Y-%m-%d")
1330
    if not isinstance(value, basestring):
1331
        value = str(value)
1332
    return safe_unicode(value)
1333
1334
1335
def get_registry_record(name, default=None):
1336
    """Returns the value of a registry record
1337
1338
    :param name: [required] name of the registry record
1339
    :type name: str
1340
    :param default: The value returned if the record is not found
1341
    :type default: anything
1342
    :returns: value of the registry record
1343
    """
1344
    return ploneapi.portal.get_registry_record(name, default=default)
1345
1346
1347
def to_display_list(pairs, sort_by="key", allow_empty=True):
1348
    """Create a Plone DisplayList from list items
1349
1350
    :param pairs: list of key, value pairs
1351
    :param sort_by: Sort the items either by key or value
1352
    :param allow_empty: Allow to select an empty value
1353
    :returns: Plone DisplayList
1354
    """
1355
    dl = DisplayList()
1356
1357
    if isinstance(pairs, basestring):
1358
        pairs = [pairs, pairs]
1359
    for pair in pairs:
1360
        # pairs is a list of lists -> add each pair
1361
        if isinstance(pair, (tuple, list)):
1362
            dl.add(*pair)
1363
        # pairs is just a single pair -> add it and stop
1364
        if isinstance(pair, basestring):
1365
            dl.add(*pairs)
1366
            break
1367
1368
    # add the empty option
1369
    if allow_empty:
1370
        dl.add("", "")
1371
1372
    # sort by key/value
1373
    if sort_by == "key":
1374
        dl = dl.sortedByKey()
1375
    elif sort_by == "value":
1376
        dl = dl.sortedByValue()
1377
1378
    return dl
1379
1380
1381
def text_to_html(text, wrap="p", encoding="utf8"):
1382
    """Convert `\n` sequences in the text to HTML `\n`
1383
1384
    :param text: Plain text to convert
1385
    :param wrap: Toggle to wrap the text in a
1386
    :returns: HTML converted and encoded text
1387
    """
1388
    if not text:
1389
        return ""
1390
    # handle text internally as unicode
1391
    text = safe_unicode(text)
1392
    # replace newline characters with HTML entities
1393
    html = text.replace("\n", "<br/>")
1394
    if wrap:
1395
        html = u"<{tag}>{html}</{tag}>".format(
1396
            tag=wrap, html=html)
1397
    # return encoded html
1398
    return html.encode(encoding)
1399