Passed
Push — 2.x ( 906086...16631d )
by Jordi
08:09
created

bika.lims.api   F

Complexity

Total Complexity 244

Size/Duplication

Total Lines 1506
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 244
eloc 668
dl 0
loc 1506
rs 1.932
c 0
b 0
f 0

76 Functions

Rating   Name   Duplication   Size   Complexity  
A get_portal_catalog() 0 6 1
B get_user_client() 0 24 7
A get_id() 0 14 4
A get_creation_date() 0 15 3
A get_uid() 0 15 5
B get_object() 0 19 6
A get_revision_history() 0 11 1
A get_url() 0 11 3
A get_user_properties() 0 17 4
C to_searchable_text_metadata() 0 27 10
A get_bika_setup() 0 4 1
A get_icon() 0 24 3
A normalize_id() 0 14 2
A get_user() 0 12 3
A is_floatable() 0 12 2
A is_at_content() 0 9 1
A get_parent_path() 0 14 3
A get_test_request() 0 6 1
B is_object() 0 18 6
A get_portal_type() 0 11 2
B get_object_by_uid() 0 27 6
A get_modification_date() 0 15 3
A get_catalogs_for() 0 26 5
A get_request() 0 7 1
A text_to_html() 0 18 3
A to_utf8() 0 11 3
A is_temporary() 0 23 5
A get_fields() 0 21 4
A create() 0 45 3
B to_display_list() 0 32 8
A get_user_contact() 0 27 5
B get_object_by_path() 0 33 7
A to_minutes() 0 7 2
A is_active() 0 11 2
A to_float() 0 13 3
A to_int() 0 17 5
A get_brain_by_uid() 0 18 3
A get_current_client() 0 6 1
A to_dhm_format() 0 16 3
A bika_cache_key_decorator() 0 11 2
B to_date() 0 20 7
A normalize_filename() 0 14 2
A get_workflow_status_of() 0 19 3
A get_version() 0 12 2
A get_parent() 0 42 5
A get_description() 0 12 3
A get_transitions_for() 0 16 2
B get_tool() 0 31 6
A get_view() 0 18 2
A get_cache_key() 0 18 2
A get_workflows_for() 0 15 2
B is_uid() 0 27 7
A do_transition_for() 0 16 3
A get_registry_record() 0 10 1
A get_portal() 0 6 1
A get_group() 0 14 3
A fail() 0 6 2
B safe_getattr() 0 24 6
A is_versionable() 0 12 1
A is_portal() 0 9 1
A get_roles_for_permission() 0 11 1
B search() 0 46 7
A is_brain() 0 9 1
A is_supermodel() 0 11 1
A get_title() 0 11 3
A is_dexterity_content() 0 9 1
A get_path() 0 11 2
A get_schema() 0 17 4
A get_users_by_roles() 0 11 2
A get_setup() 0 5 1
A is_date() 0 11 2
A is_folderish() 0 13 3
A get_review_status() 0 11 2
A get_previous_worfklow_status_of() 0 27 5
A get_current_user() 0 6 1
B get_review_history() 0 37 5

How to fix   Complexity   

Complexity

Complex classes like bika.lims.api often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import Missing
22
import re
23
import six
24
from AccessControl.PermissionRole import rolesForPermissionOn
25
from Acquisition import aq_base
26
from Acquisition import aq_inner
27
from Acquisition import aq_parent
28
from bika.lims import logger
29
from bika.lims.interfaces import IClient
30
from bika.lims.interfaces import IContact
31
from bika.lims.interfaces import ILabContact
32
from collections import OrderedDict
33
from datetime import datetime
34
from DateTime import DateTime
35
from datetime import timedelta
36
from DateTime.interfaces import DateTimeError
37
from itertools import groupby
38
from plone import api as ploneapi
39
from plone.api.exc import InvalidParameterError
40
from plone.app.layout.viewlets.content import ContentHistoryView
41
from plone.behavior.interfaces import IBehaviorAssignable
42
from plone.dexterity.interfaces import IDexterityContent
43
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
44
from plone.i18n.normalizer.interfaces import IIDNormalizer
45
from plone.memoize.volatile import DontCache
46
from Products.Archetypes.atapi import DisplayList
47
from Products.Archetypes.BaseObject import BaseObject
48
from Products.CMFCore.interfaces import IFolderish
49
from Products.CMFCore.interfaces import ISiteRoot
50
from Products.CMFCore.utils import getToolByName
51
from Products.CMFCore.WorkflowCore import WorkflowException
52
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
53
from Products.CMFPlone.utils import _createObjectByType
54
from Products.CMFPlone.utils import base_hasattr
55
from Products.CMFPlone.utils import safe_unicode
56
from Products.PlonePAS.tools.memberdata import MemberData
57
from Products.ZCatalog.interfaces import ICatalogBrain
58
from zope import globalrequest
59
from zope.annotation.interfaces import IAttributeAnnotatable
60
from zope.component import getUtility
61
from zope.component import queryMultiAdapter
62
from zope.component.interfaces import IFactory
63
from zope.event import notify
64
from zope.interface import directlyProvides
65
from zope.lifecycleevent import ObjectCreatedEvent
66
from zope.publisher.browser import TestRequest
67
from zope.schema import getFieldsInOrder
68
from zope.security.interfaces import Unauthorized
69
70
"""SENAITE LIMS Framework API
71
72
Please see bika.lims/docs/API.rst for documentation.
73
74
Architecural Notes:
75
76
Please add only functions that do a single thing for a single object.
77
78
Good: `def get_foo(brain_or_object)`
79
Bad:  `def get_foos(list_of_brain_objects)`
80
81
Why?
82
83
Because it makes things more complex. You can always use a pattern like this to
84
achieve the same::
85
86
    >>> foos = map(get_foo, list_of_brain_objects)
87
88
Please add for all of your functions a descriptive test in docs/API.rst.
89
90
Thanks.
91
"""
92
93
_marker = object()
94
95
UID_RX = re.compile("[a-z0-9]{32}$")
96
97
98
class APIError(Exception):
99
    """Base exception class for bika.lims errors."""
100
101
102
def get_portal():
103
    """Get the portal object
104
105
    :returns: Portal object
106
    """
107
    return ploneapi.portal.getSite()
108
109
110
def get_setup():
111
    """Fetch the `bika_setup` folder.
112
    """
113
    portal = get_portal()
114
    return portal.get("bika_setup")
115
116
117
def get_bika_setup():
118
    """Fetch the `bika_setup` folder.
119
    """
120
    return get_setup()
121
122
123
def create(container, portal_type, *args, **kwargs):
124
    """Creates an object in Bika LIMS
125
126
    This code uses most of the parts from the TypesTool
127
    see: `Products.CMFCore.TypesTool._constructInstance`
128
129
    :param container: container
130
    :type container: ATContentType/DexterityContentType/CatalogBrain
131
    :param portal_type: The portal type to create, e.g. "Client"
132
    :type portal_type: string
133
    :param title: The title for the new content object
134
    :type title: string
135
    :returns: The new created object
136
    """
137
    from bika.lims.utils import tmpID
138
139
    id = kwargs.pop("id", tmpID())
140
    title = kwargs.pop("title", "New {}".format(portal_type))
141
142
    # get the fti
143
    types_tool = get_tool("portal_types")
144
    fti = types_tool.getTypeInfo(portal_type)
145
146
    if fti.product:
147
        obj = _createObjectByType(portal_type, container, id)
148
        obj.processForm()
149
        obj.edit(title=title, **kwargs)
150
    else:
151
        # newstyle factory
152
        factory = getUtility(IFactory, fti.factory)
153
        obj = factory(id, *args, **kwargs)
154
        if hasattr(obj, '_setPortalTypeName'):
155
            obj._setPortalTypeName(fti.getId())
156
        # set the title
157
        obj.title = safe_unicode(title)
158
        # notify that the object was created
159
        notify(ObjectCreatedEvent(obj))
160
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and
161
        # ContainerModifiedEvent
162
        container._setObject(id, obj)
163
        # we get the object here with the current object id, as it might be
164
        # renamed already by an event handler
165
        obj = container._getOb(obj.getId())
166
167
    return obj
168
169
170
def get_tool(name, context=None, default=_marker):
171
    """Get a portal tool by name
172
173
    :param name: The name of the tool, e.g. `portal_catalog`
174
    :type name: string
175
    :param context: A portal object
176
    :type context: ATContentType/DexterityContentType/CatalogBrain
177
    :returns: Portal Tool
178
    """
179
180
    # Try first with the context
181
    if context is not None:
182
        try:
183
            context = get_object(context)
184
            return getToolByName(context, name)
185
        except (APIError, AttributeError) as e:
186
            # https://github.com/senaite/bika.lims/issues/396
187
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
188
                        "-> falling back to plone.api.portal.get_tool('{}')"
189
                        .format(repr(context), name, repr(e), name))
190
            return get_tool(name, default=default)
191
192
    # Try with the plone api
193
    try:
194
        return ploneapi.portal.get_tool(name)
195
    except InvalidParameterError:
196
        if default is not _marker:
197
            if isinstance(default, six.string_types):
198
                return get_tool(default)
199
            return default
200
        fail("No tool named '%s' found." % name)
201
202
203
def fail(msg=None):
204
    """API LIMS Error
205
    """
206
    if msg is None:
207
        msg = "Reason not given."
208
    raise APIError("{}".format(msg))
209
210
211
def is_object(brain_or_object):
212
    """Check if the passed in object is a supported portal content object
213
214
    :param brain_or_object: A single catalog brain or content object
215
    :type brain_or_object: Portal Object
216
    :returns: True if the passed in object is a valid portal content
217
    """
218
    if is_portal(brain_or_object):
219
        return True
220
    if is_supermodel(brain_or_object):
221
        return True
222
    if is_at_content(brain_or_object):
223
        return True
224
    if is_dexterity_content(brain_or_object):
225
        return True
226
    if is_brain(brain_or_object):
227
        return True
228
    return False
229
230
231
def get_object(brain_object_uid, default=_marker):
232
    """Get the full content object
233
234
    :param brain_object_uid: A catalog brain or content object or uid
235
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
236
    /CatalogBrain/basestring
237
    :returns: The full object
238
    """
239
    if is_uid(brain_object_uid):
240
        return get_object_by_uid(brain_object_uid)
241
    elif is_supermodel(brain_object_uid):
242
        return brain_object_uid.instance
243
    if not is_object(brain_object_uid):
244
        if default is _marker:
245
            fail("{} is not supported.".format(repr(brain_object_uid)))
246
        return default
247
    if is_brain(brain_object_uid):
248
        return brain_object_uid.getObject()
249
    return brain_object_uid
250
251
252
def is_portal(brain_or_object):
253
    """Checks if the passed in object is the portal root object
254
255
    :param brain_or_object: A single catalog brain or content object
256
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
257
    :returns: True if the object is the portal root object
258
    :rtype: bool
259
    """
260
    return ISiteRoot.providedBy(brain_or_object)
261
262
263
def is_brain(brain_or_object):
264
    """Checks if the passed in object is a portal catalog brain
265
266
    :param brain_or_object: A single catalog brain or content object
267
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
268
    :returns: True if the object is a catalog brain
269
    :rtype: bool
270
    """
271
    return ICatalogBrain.providedBy(brain_or_object)
272
273
274
def is_supermodel(brain_or_object):
275
    """Checks if the passed in object is a supermodel
276
277
    :param brain_or_object: A single catalog brain or content object
278
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
279
    :returns: True if the object is a catalog brain
280
    :rtype: bool
281
    """
282
    # avoid circular imports
283
    from senaite.app.supermodel.interfaces import ISuperModel
284
    return ISuperModel.providedBy(brain_or_object)
285
286
287
def is_dexterity_content(brain_or_object):
288
    """Checks if the passed in object is a dexterity content type
289
290
    :param brain_or_object: A single catalog brain or content object
291
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
292
    :returns: True if the object is a dexterity content type
293
    :rtype: bool
294
    """
295
    return IDexterityContent.providedBy(brain_or_object)
296
297
298
def is_at_content(brain_or_object):
299
    """Checks if the passed in object is an AT content type
300
301
    :param brain_or_object: A single catalog brain or content object
302
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
303
    :returns: True if the object is an AT content type
304
    :rtype: bool
305
    """
306
    return isinstance(brain_or_object, BaseObject)
307
308
309
def is_folderish(brain_or_object):
310
    """Checks if the passed in object is folderish
311
312
    :param brain_or_object: A single catalog brain or content object
313
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
314
    :returns: True if the object is folderish
315
    :rtype: bool
316
    """
317
    if hasattr(brain_or_object, "is_folderish"):
318
        if callable(brain_or_object.is_folderish):
319
            return brain_or_object.is_folderish()
320
        return brain_or_object.is_folderish
321
    return IFolderish.providedBy(get_object(brain_or_object))
322
323
324
def get_portal_type(brain_or_object):
325
    """Get the portal type for this object
326
327
    :param brain_or_object: A single catalog brain or content object
328
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
329
    :returns: Portal type
330
    :rtype: string
331
    """
332
    if not is_object(brain_or_object):
333
        fail("{} is not supported.".format(repr(brain_or_object)))
334
    return brain_or_object.portal_type
335
336
337
def get_schema(brain_or_object):
338
    """Get the schema of the content
339
340
    :param brain_or_object: A single catalog brain or content object
341
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
342
    :returns: Schema object
343
    """
344
    obj = get_object(brain_or_object)
345
    if is_portal(obj):
346
        fail("get_schema can't return schema of portal root")
347
    if is_dexterity_content(obj):
348
        pt = get_tool("portal_types")
349
        fti = pt.getTypeInfo(obj.portal_type)
350
        return fti.lookupSchema()
351
    if is_at_content(obj):
352
        return obj.Schema()
353
    fail("{} has no Schema.".format(brain_or_object))
354
355
356
def get_fields(brain_or_object):
357
    """Get a name to field mapping of the object
358
359
    :param brain_or_object: A single catalog brain or content object
360
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
361
    :returns: Mapping of name -> field
362
    :rtype: OrderedDict
363
    """
364
    obj = get_object(brain_or_object)
365
    schema = get_schema(obj)
366
    if is_dexterity_content(obj):
367
        # get the fields directly provided by the interface
368
        fields = getFieldsInOrder(schema)
369
        # append the fields coming from behaviors
370
        behavior_assignable = IBehaviorAssignable(obj)
371
        if behavior_assignable:
372
            behaviors = behavior_assignable.enumerateBehaviors()
373
            for behavior in behaviors:
374
                fields.extend(getFieldsInOrder(behavior.interface))
375
        return OrderedDict(fields)
376
    return OrderedDict(schema)
377
378
379
def get_id(brain_or_object):
380
    """Get the Plone ID for this object
381
382
    :param brain_or_object: A single catalog brain or content object
383
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
384
    :returns: Plone ID
385
    :rtype: string
386
    """
387
    if is_brain(brain_or_object):
388
        if base_hasattr(brain_or_object, "getId"):
389
            return brain_or_object.getId
390
        if base_hasattr(brain_or_object, "id"):
391
            return brain_or_object.id
392
    return get_object(brain_or_object).getId()
393
394
395
def get_title(brain_or_object):
396
    """Get the Title for this object
397
398
    :param brain_or_object: A single catalog brain or content object
399
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
400
    :returns: Title
401
    :rtype: string
402
    """
403
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
404
        return brain_or_object.Title
405
    return get_object(brain_or_object).Title()
406
407
408
def get_description(brain_or_object):
409
    """Get the Title for this object
410
411
    :param brain_or_object: A single catalog brain or content object
412
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
413
    :returns: Title
414
    :rtype: string
415
    """
416
    if is_brain(brain_or_object) \
417
            and base_hasattr(brain_or_object, "Description"):
418
        return brain_or_object.Description
419
    return get_object(brain_or_object).Description()
420
421
422
def get_uid(brain_or_object):
423
    """Get the Plone UID for this object
424
425
    :param brain_or_object: A single catalog brain or content object or an UID
426
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
427
    :returns: Plone UID
428
    :rtype: string
429
    """
430
    if is_uid(brain_or_object):
431
        return brain_or_object
432
    if is_portal(brain_or_object):
433
        return '0'
434
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
435
        return brain_or_object.UID
436
    return get_object(brain_or_object).UID()
437
438
439
def get_url(brain_or_object):
440
    """Get the absolute URL for this object
441
442
    :param brain_or_object: A single catalog brain or content object
443
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
444
    :returns: Absolute URL
445
    :rtype: string
446
    """
447
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
448
        return brain_or_object.getURL()
449
    return get_object(brain_or_object).absolute_url()
450
451
452
def get_icon(brain_or_object, html_tag=True):
453
    """Get the icon of the content object
454
455
    :param brain_or_object: A single catalog brain or content object
456
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
457
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
458
    :type html_tag: bool
459
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
460
    :rtype: string
461
    """
462
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
463
    # work for Contents coming from other catalogs than the
464
    # `portal_catalog`
465
    portal_types = get_tool("portal_types")
466
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
467
    icon = fti.getIcon()
468
    if not icon:
469
        return ""
470
    url = "%s/%s" % (get_url(get_portal()), icon)
471
    if not html_tag:
472
        return url
473
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
474
        url=url, title=get_title(brain_or_object))
475
    return tag
476
477
478
def get_object_by_uid(uid, default=_marker):
479
    """Find an object by a given UID
480
481
    :param uid: The UID of the object to find
482
    :type uid: string
483
    :returns: Found Object or None
484
    """
485
486
    # nothing to do here
487
    if not uid:
488
        if default is not _marker:
489
            return default
490
        fail("get_object_by_uid requires UID as first argument; got {} instead"
491
             .format(uid))
492
493
    # we defined the portal object UID to be '0'::
494
    if uid == '0':
495
        return get_portal()
496
497
    brain = get_brain_by_uid(uid)
498
499
    if brain is None:
500
        if default is not _marker:
501
            return default
502
        fail("No object found for UID {}".format(uid))
503
504
    return get_object(brain)
505
506
507
def get_brain_by_uid(uid, default=None):
508
    """Query a brain by a given UID
509
510
    :param uid: The UID of the object to find
511
    :type uid: string
512
    :returns: ZCatalog brain or None
513
    """
514
    if not is_uid(uid):
515
        return default
516
517
    # we try to find the object with the UID catalog
518
    uc = get_tool("uid_catalog")
519
520
    # try to find the object with the reference catalog first
521
    brains = uc(UID=uid)
522
    if len(brains) != 1:
523
        return default
524
    return brains[0]
525
526
527
def get_object_by_path(path, default=_marker):
528
    """Find an object by a given physical path or absolute_url
529
530
    :param path: The physical path of the object to find
531
    :type path: string
532
    :returns: Found Object or None
533
    """
534
535
    # nothing to do here
536
    if not path:
537
        if default is not _marker:
538
            return default
539
        fail("get_object_by_path first argument must be a path; {} received"
540
             .format(path))
541
542
    portal = get_portal()
543
    portal_path = get_path(portal)
544
    portal_url = get_url(portal)
545
546
    # ensure we have a physical path
547
    if path.startswith(portal_url):
548
        request = get_request()
549
        path = "/".join(request.physicalPathFromURL(path))
550
551
    if not path.startswith(portal_path):
552
        if default is not _marker:
553
            return default
554
        fail("Not a physical path inside the portal.")
555
556
    if path == portal_path:
557
        return portal
558
559
    return portal.restrictedTraverse(path, default)
560
561
562
def get_path(brain_or_object):
563
    """Calculate the physical path of this object
564
565
    :param brain_or_object: A single catalog brain or content object
566
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
567
    :returns: Physical path of the object
568
    :rtype: string
569
    """
570
    if is_brain(brain_or_object):
571
        return brain_or_object.getPath()
572
    return "/".join(get_object(brain_or_object).getPhysicalPath())
573
574
575
def get_parent_path(brain_or_object):
576
    """Calculate the physical parent path of this object
577
578
    :param brain_or_object: A single catalog brain or content object
579
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
580
    :returns: Physical path of the parent object
581
    :rtype: string
582
    """
583
    if is_portal(brain_or_object):
584
        return get_path(get_portal())
585
    if is_brain(brain_or_object):
586
        path = get_path(brain_or_object)
587
        return path.rpartition("/")[0]
588
    return get_path(get_object(brain_or_object).aq_parent)
589
590
591
def get_parent(brain_or_object, catalog_search=False):
592
    """Locate the parent object of the content/catalog brain
593
594
    The `catalog_search` switch uses the `portal_catalog` to do a search return
595
    a brain instead of the full parent object. However, if the search returned
596
    no results, it falls back to return the full parent object.
597
598
    :param brain_or_object: A single catalog brain or content object
599
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
600
    :param catalog_search: Use a catalog query to find the parent object
601
    :type catalog_search: bool
602
    :returns: parent object
603
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
604
    """
605
606
    if is_portal(brain_or_object):
607
        return get_portal()
608
609
    # Do a catalog search and return the brain
610
    if catalog_search:
611
        parent_path = get_parent_path(brain_or_object)
612
613
        # parent is the portal object
614
        if parent_path == get_path(get_portal()):
615
            return get_portal()
616
617
        # get the catalog tool
618
        pc = get_portal_catalog()
619
620
        # query for the parent path
621
        results = pc(path={
622
            "query": parent_path,
623
            "depth": 0})
624
625
        # No results fallback: return the parent object
626
        if not results:
627
            return get_object(brain_or_object).aq_parent
628
629
        # return the brain
630
        return results[0]
631
632
    return get_object(brain_or_object).aq_parent
633
634
635
def search(query, catalog=_marker):
636
    """Search for objects.
637
638
    :param query: A suitable search query.
639
    :type query: dict
640
    :param catalog: A single catalog id or a list of catalog ids
641
    :type catalog: str/list
642
    :returns: Search results
643
    :rtype: List of ZCatalog brains
644
    """
645
646
    # query needs to be a dictionary
647
    if not isinstance(query, dict):
648
        fail("Catalog query needs to be a dictionary")
649
650
    # Portal types to query
651
    portal_types = query.get("portal_type", [])
652
    # We want the portal_type as a list
653
    if not isinstance(portal_types, (tuple, list)):
654
        portal_types = [portal_types]
655
656
    # The catalogs used for the query
657
    catalogs = []
658
659
    # The user did **not** specify a catalog
660
    if catalog is _marker:
661
        # Find the registered catalogs for the queried portal types
662
        for portal_type in portal_types:
663
            # Just get the first registered/default catalog
664
            catalogs.append(get_catalogs_for(
665
                portal_type, default="portal_catalog")[0])
666
    else:
667
        # User defined catalogs
668
        if isinstance(catalog, (list, tuple)):
669
            catalogs.extend(map(get_tool, catalog))
670
        else:
671
            catalogs.append(get_tool(catalog))
672
673
    # Cleanup: Avoid duplicate catalogs
674
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
675
676
    # We only support **single** catalog queries
677
    if len(catalogs) > 1:
678
        fail("Multi Catalog Queries are not supported!")
679
680
    return catalogs[0](query)
681
682
683
def safe_getattr(brain_or_object, attr, default=_marker):
684
    """Return the attribute value
685
686
    :param brain_or_object: A single catalog brain or content object
687
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
688
    :param attr: Attribute name
689
    :type attr: str
690
    :returns: Attribute value
691
    :rtype: obj
692
    """
693
    try:
694
        value = getattr(brain_or_object, attr, _marker)
695
        if value is _marker:
696
            if default is not _marker:
697
                return default
698
            fail("Attribute '{}' not found.".format(attr))
699
        if callable(value):
700
            return value()
701
        return value
702
    except Unauthorized:
703
        if default is not _marker:
704
            return default
705
        fail("You are not authorized to access '{}' of '{}'.".format(
706
            attr, repr(brain_or_object)))
707
708
709
def get_portal_catalog():
710
    """Get the portal catalog tool
711
712
    :returns: Portal Catalog Tool
713
    """
714
    return get_tool("portal_catalog")
715
716
717
def get_review_history(brain_or_object, rev=True):
718
    """Get the review history for the given brain or context.
719
720
    :param brain_or_object: A single catalog brain or content object
721
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
722
    :returns: Workflow history
723
    :rtype: [{}, ...]
724
    """
725
    obj = get_object(brain_or_object)
726
    review_history = []
727
    try:
728
        workflow = get_tool("portal_workflow")
729
        review_history = workflow.getInfoFor(obj, 'review_history')
730
    except WorkflowException as e:
731
        message = str(e)
732
        logger.error("Cannot retrieve review_history on {}: {}".format(
733
            obj, message))
734
    if not isinstance(review_history, (list, tuple)):
735
        logger.error("get_review_history: expected list, received {}".format(
736
            review_history))
737
        review_history = []
738
739
    if isinstance(review_history, tuple):
740
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
741
        # tuple when "review_history" is passed in:
742
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
743
        #
744
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
745
        # Expression.StateChangeInfo, that always returns a list, except when no
746
        # review_history is found:
747
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
748
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
749
        review_history = list(review_history)
750
751
    if rev is True:
752
        review_history.reverse()
753
    return review_history
754
755
756
def get_revision_history(brain_or_object):
757
    """Get the revision history for the given brain or context.
758
759
    :param brain_or_object: A single catalog brain or content object
760
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
761
    :returns: Workflow history
762
    :rtype: obj
763
    """
764
    obj = get_object(brain_or_object)
765
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
766
    return chv.fullHistory()
767
768
769
def get_workflows_for(brain_or_object):
770
    """Get the assigned workflows for the given brain or context.
771
772
    Note: This function supports also the portal_type as parameter.
773
774
    :param brain_or_object: A single catalog brain or content object
775
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
776
    :returns: Assigned Workflows
777
    :rtype: tuple
778
    """
779
    workflow = ploneapi.portal.get_tool("portal_workflow")
780
    if isinstance(brain_or_object, six.string_types):
781
        return workflow.getChainFor(brain_or_object)
782
    obj = get_object(brain_or_object)
783
    return workflow.getChainFor(obj)
784
785
786
def get_workflow_status_of(brain_or_object, state_var="review_state"):
787
    """Get the current workflow status of the given brain or context.
788
789
    :param brain_or_object: A single catalog brain or content object
790
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
791
    :param state_var: The name of the state variable
792
    :type state_var: string
793
    :returns: Status
794
    :rtype: str
795
    """
796
    # Try to get the state from the catalog brain first
797
    if is_brain(brain_or_object):
798
        if state_var in brain_or_object.schema():
799
            return brain_or_object[state_var]
800
801
    # Retrieve the sate from the object
802
    workflow = get_tool("portal_workflow")
803
    obj = get_object(brain_or_object)
804
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
805
806
807
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
808
    """Get the previous workflow status of the object
809
810
    :param brain_or_object: A single catalog brain or content object
811
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
812
    :param skip: Workflow states to skip
813
    :type skip: tuple/list
814
    :returns: status
815
    :rtype: str
816
    """
817
818
    skip = isinstance(skip, (list, tuple)) and skip or []
819
    history = get_review_history(brain_or_object)
820
821
    # Remove consecutive duplicates, some transitions might happen more than
822
    # once consecutively (e.g. publish)
823
    history = map(lambda i: i[0], groupby(history))
824
825
    for num, item in enumerate(history):
826
        # skip the current history entry
827
        if num == 0:
828
            continue
829
        status = item.get("review_state")
830
        if status in skip:
831
            continue
832
        return status
833
    return default
834
835
836
def get_creation_date(brain_or_object):
837
    """Get the creation date of the brain or object
838
839
    :param brain_or_object: A single catalog brain or content object
840
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
841
    :returns: Creation date
842
    :rtype: DateTime
843
    """
844
    created = getattr(brain_or_object, "created", None)
845
    if created is None:
846
        fail("Object {} has no creation date ".format(
847
             repr(brain_or_object)))
848
    if callable(created):
849
        return created()
850
    return created
851
852
853
def get_modification_date(brain_or_object):
854
    """Get the modification date of the brain or object
855
856
    :param brain_or_object: A single catalog brain or content object
857
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
858
    :returns: Modification date
859
    :rtype: DateTime
860
    """
861
    modified = getattr(brain_or_object, "modified", None)
862
    if modified is None:
863
        fail("Object {} has no modification date ".format(
864
             repr(brain_or_object)))
865
    if callable(modified):
866
        return modified()
867
    return modified
868
869
870
def get_review_status(brain_or_object):
871
    """Get the `review_state` of an object
872
873
    :param brain_or_object: A single catalog brain or content object
874
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
875
    :returns: Value of the review_status variable
876
    :rtype: String
877
    """
878
    if is_brain(brain_or_object):
879
        return brain_or_object.review_state
880
    return get_workflow_status_of(brain_or_object, state_var="review_state")
881
882
883
def is_active(brain_or_object):
884
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
885
886
    :param brain_or_object: A single catalog brain or content object
887
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
888
    :returns: False if the object is in the state 'inactive' or 'cancelled'
889
    :rtype: bool
890
    """
891
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
892
        return False
893
    return True
894
895
896
def get_catalogs_for(brain_or_object, default="portal_catalog"):
897
    """Get all registered catalogs for the given portal_type, catalog brain or
898
    content object
899
900
    :param brain_or_object: The portal_type, a catalog brain or content object
901
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
902
    :returns: List of supported catalogs
903
    :rtype: list
904
    """
905
    archetype_tool = get_tool("archetype_tool", default=None)
906
    if archetype_tool is None:
907
        # return the default catalog
908
        return [get_tool(default, default="portal_catalog")]
909
910
    catalogs = []
911
912
    # get the registered catalogs for portal_type
913
    if is_object(brain_or_object):
914
        catalogs = archetype_tool.getCatalogsByType(
915
            get_portal_type(brain_or_object))
916
    if isinstance(brain_or_object, six.string_types):
917
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
918
919
    if not catalogs:
920
        return [get_tool(default)]
921
    return catalogs
922
923
924
def get_transitions_for(brain_or_object):
925
    """List available workflow transitions for all workflows
926
927
    :param brain_or_object: A single catalog brain or content object
928
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
929
    :returns: All possible available and allowed transitions
930
    :rtype: list[dict]
931
    """
932
    workflow = get_tool('portal_workflow')
933
    transitions = []
934
    instance = get_object(brain_or_object)
935
    for wfid in get_workflows_for(brain_or_object):
936
        wf = workflow[wfid]
937
        tlist = wf.getTransitionsFor(instance)
938
        transitions.extend([t for t in tlist if t not in transitions])
939
    return transitions
940
941
942
def do_transition_for(brain_or_object, transition):
943
    """Performs a workflow transition for the passed in object.
944
945
    :param brain_or_object: A single catalog brain or content object
946
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
947
    :returns: The object where the transtion was performed
948
    """
949
    if not isinstance(transition, six.string_types):
950
        fail("Transition type needs to be string, got '%s'" % type(transition))
951
    obj = get_object(brain_or_object)
952
    try:
953
        ploneapi.content.transition(obj, transition)
954
    except ploneapi.exc.InvalidParameterError as e:
955
        fail("Failed to perform transition '{}' on {}: {}".format(
956
             transition, obj, str(e)))
957
    return obj
958
959
960
def get_roles_for_permission(permission, brain_or_object):
961
    """Get a list of granted roles for the given permission on the object.
962
963
    :param brain_or_object: A single catalog brain or content object
964
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
965
    :returns: Roles for the given Permission
966
    :rtype: list
967
    """
968
    obj = get_object(brain_or_object)
969
    allowed = set(rolesForPermissionOn(permission, obj))
970
    return sorted(allowed)
971
972
973
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
974
    """Checks if the passed in object is versionable.
975
976
    :param brain_or_object: A single catalog brain or content object
977
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
978
    :returns: True if the object is versionable
979
    :rtype: bool
980
    """
981
    pr = get_tool("portal_repository")
982
    obj = get_object(brain_or_object)
983
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
984
        and pr.isVersionable(obj)
985
986
987
def get_version(brain_or_object):
988
    """Get the version of the current object
989
990
    :param brain_or_object: A single catalog brain or content object
991
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
992
    :returns: The current version of the object, or None if not available
993
    :rtype: int or None
994
    """
995
    obj = get_object(brain_or_object)
996
    if not is_versionable(obj):
997
        return None
998
    return getattr(aq_base(obj), "version_id", 0)
999
1000
1001
def get_view(name, context=None, request=None, default=None):
1002
    """Get the view by name
1003
1004
    :param name: The name of the view
1005
    :type name: str
1006
    :param context: The context to query the view
1007
    :type context: ATContentType/DexterityContentType/CatalogBrain
1008
    :param request: The request to query the view
1009
    :type request: HTTPRequest object
1010
    :returns: HTTP Request
1011
    :rtype: Products.Five.metaclass View object
1012
    """
1013
    context = context or get_portal()
1014
    request = request or get_request() or None
1015
    view = queryMultiAdapter((get_object(context), request), name=name)
1016
    if view is None:
1017
        return default
1018
    return view
1019
1020
1021
def get_request():
1022
    """Get the global request object
1023
1024
    :returns: HTTP Request
1025
    :rtype: HTTPRequest object
1026
    """
1027
    return globalrequest.getRequest()
1028
1029
1030
def get_test_request():
1031
    """Get the TestRequest object
1032
    """
1033
    request = TestRequest()
1034
    directlyProvides(request, IAttributeAnnotatable)
1035
    return request
1036
1037
1038
def get_group(group_or_groupname):
1039
    """Return Plone Group
1040
1041
    :param group_or_groupname: Plone group or the name of the group
1042
    :type groupname:  GroupData/str
1043
    :returns: Plone GroupData
1044
    """
1045
    if not group_or_groupname:
1046
1047
        return None
1048
    if hasattr(group_or_groupname, "_getGroup"):
1049
        return group_or_groupname
1050
    gtool = get_tool("portal_groups")
1051
    return gtool.getGroupById(group_or_groupname)
1052
1053
1054
def get_user(user_or_username):
1055
    """Return Plone User
1056
1057
    :param user_or_username: Plone user or user id
1058
    :returns: Plone MemberData
1059
    """
1060
    user = None
1061
    if isinstance(user_or_username, MemberData):
1062
        user = user_or_username
1063
    if isinstance(user_or_username, six.string_types):
1064
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1065
    return user
1066
1067
1068
def get_user_properties(user_or_username):
1069
    """Return User Properties
1070
1071
    :param user_or_username: Plone group identifier
1072
    :returns: Plone MemberData
1073
    """
1074
    user = get_user(user_or_username)
1075
    if user is None:
1076
        return {}
1077
    if not callable(user.getUser):
1078
        return {}
1079
    out = {}
1080
    plone_user = user.getUser()
1081
    for sheet in plone_user.listPropertysheets():
1082
        ps = plone_user.getPropertysheet(sheet)
1083
        out.update(dict(ps.propertyItems()))
1084
    return out
1085
1086
1087
def get_users_by_roles(roles=None):
1088
    """Search Plone users by their roles
1089
1090
    :param roles: Plone role name or list of roles
1091
    :type roles:  list/str
1092
    :returns: List of Plone users having the role(s)
1093
    """
1094
    if not isinstance(roles, (tuple, list)):
1095
        roles = [roles]
1096
    mtool = get_tool("portal_membership")
1097
    return mtool.searchForMembers(roles=roles)
1098
1099
1100
def get_current_user():
1101
    """Returns the current logged in user
1102
1103
    :returns: Current User
1104
    """
1105
    return ploneapi.user.get_current()
1106
1107
1108
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1109
    """Returns the associated contact of a Plone user
1110
1111
    If the user passed in has no contact associated, return None.
1112
    The `contact_types` parameter filter the portal types for the search.
1113
1114
    :param: Plone user
1115
    :contact_types: List with the contact portal types to search
1116
    :returns: Contact associated to the Plone user or None
1117
    """
1118
    if not user:
1119
        return None
1120
1121
    query = {'portal_type': contact_types, 'getUsername': user.id}
1122
    brains = search(query, catalog='portal_catalog')
1123
    if not brains:
1124
        return None
1125
1126
    if len(brains) > 1:
1127
        # Oops, the user has multiple contacts assigned, return None
1128
        contacts = map(lambda c: c.Title, brains)
1129
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1130
        err_msg = err_msg.format(user.id, ','.join(contacts))
1131
        logger.error(err_msg)
1132
        return None
1133
1134
    return get_object(brains[0])
1135
1136
1137
def get_user_client(user_or_contact):
1138
    """Returns the client of the contact of a Plone user
1139
1140
    If the user passed in has no contact or does not belong to any client,
1141
    returns None.
1142
1143
    :param: Plone user or contact
1144
    :returns: Client the contact of the Plone user belongs to
1145
    """
1146
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1147
        # Lab contacts cannot belong to a client
1148
        return None
1149
1150
    if not IContact.providedBy(user_or_contact):
1151
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1152
        if IContact.providedBy(contact):
1153
            return get_user_client(contact)
1154
        return None
1155
1156
    client = get_parent(user_or_contact)
1157
    if client and IClient.providedBy(client):
1158
        return client
1159
1160
    return None
1161
1162
1163
def get_current_client():
1164
    """Returns the current client the current logged in user belongs to, if any
1165
1166
    :returns: Client the current logged in user belongs to or None
1167
    """
1168
    return get_user_client(get_current_user())
1169
1170
1171
def get_cache_key(brain_or_object):
1172
    """Generate a cache key for a common brain or object
1173
1174
    :param brain_or_object: A single catalog brain or content object
1175
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1176
    :returns: Cache Key
1177
    :rtype: str
1178
    """
1179
    key = [
1180
        get_portal_type(brain_or_object),
1181
        get_id(brain_or_object),
1182
        get_uid(brain_or_object),
1183
        # handle different domains gracefully
1184
        get_url(brain_or_object),
1185
        # Return the microsecond since the epoch in GMT
1186
        get_modification_date(brain_or_object).micros(),
1187
    ]
1188
    return "-".join(map(lambda x: str(x), key))
1189
1190
1191
def bika_cache_key_decorator(method, self, brain_or_object):
1192
    """Bika cache key decorator usable for
1193
1194
    :param brain_or_object: A single catalog brain or content object
1195
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1196
    :returns: Cache Key
1197
    :rtype: str
1198
    """
1199
    if brain_or_object is None:
1200
        raise DontCache
1201
    return get_cache_key(brain_or_object)
1202
1203
1204
def normalize_id(string):
1205
    """Normalize the id
1206
1207
    :param string: A string to normalize
1208
    :type string: str
1209
    :returns: Normalized ID
1210
    :rtype: str
1211
    """
1212
    if not isinstance(string, six.string_types):
1213
        fail("Type of argument must be string, found '{}'"
1214
             .format(type(string)))
1215
    # get the id nomalizer utility
1216
    normalizer = getUtility(IIDNormalizer).normalize
1217
    return normalizer(string)
1218
1219
1220
def normalize_filename(string):
1221
    """Normalize the filename
1222
1223
    :param string: A string to normalize
1224
    :type string: str
1225
    :returns: Normalized ID
1226
    :rtype: str
1227
    """
1228
    if not isinstance(string, six.string_types):
1229
        fail("Type of argument must be string, found '{}'"
1230
             .format(type(string)))
1231
    # get the file nomalizer utility
1232
    normalizer = getUtility(IFileNameNormalizer).normalize
1233
    return normalizer(string)
1234
1235
1236
def is_uid(uid, validate=False):
1237
    """Checks if the passed in uid is a valid UID
1238
1239
    :param uid: The uid to check
1240
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1241
    True, also verifies if a brain exists for the uid passed in
1242
    :type uid: string
1243
    :return: True if a valid uid
1244
    :rtype: bool
1245
    """
1246
    if not isinstance(uid, six.string_types):
1247
        return False
1248
    if uid == '0':
1249
        return True
1250
    if len(uid) != 32:
1251
        return False
1252
    if not UID_RX.match(uid):
1253
        return False
1254
    if not validate:
1255
        return True
1256
1257
    # Check if a brain for this uid exists
1258
    uc = get_tool('uid_catalog')
1259
    brains = uc(UID=uid)
1260
    if brains:
1261
        assert (len(brains) == 1)
1262
    return len(brains) > 0
1263
1264
1265
def is_date(date):
1266
    """Checks if the passed in value is a valid Zope's DateTime
1267
1268
    :param date: The date to check
1269
    :type date: DateTime
1270
    :return: True if a valid date
1271
    :rtype: bool
1272
    """
1273
    if not date:
1274
        return False
1275
    return isinstance(date, (DateTime, datetime))
1276
1277
1278
def to_date(value, default=None):
1279
    """Tries to convert the passed in value to Zope's DateTime
1280
1281
    :param value: The value to be converted to a valid DateTime
1282
    :type value: str, DateTime or datetime
1283
    :return: The DateTime representation of the value passed in or default
1284
    """
1285
    if isinstance(value, DateTime):
1286
        return value
1287
    if not value:
1288
        if default is None:
1289
            return None
1290
        return to_date(default)
1291
    try:
1292
        if isinstance(value, str) and '.' in value:
1293
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1294
            return DateTime(value, datefmt='international')
1295
        return DateTime(value)
1296
    except (TypeError, ValueError, DateTimeError):
1297
        return to_date(default)
1298
1299
1300
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1301
               round_to_int=True):
1302
    """Returns the computed total number of minutes
1303
    """
1304
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1305
        float(seconds)/60 + float(milliseconds)/1000/60
1306
    return int(round(total)) if round_to_int else total
1307
1308
1309
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1310
    """Returns a representation of time in a string in xd yh zm format
1311
    """
1312
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1313
                         seconds=seconds, milliseconds=milliseconds)
1314
    delta = timedelta(minutes=int(round(minutes)))
1315
    d = delta.days
1316
    h = delta.seconds // 3600
1317
    m = (delta.seconds // 60) % 60
1318
    m = m and "{}m ".format(str(m)) or ""
1319
    d = d and "{}d ".format(str(d)) or ""
1320
    if m and d:
1321
        h = "{}h ".format(str(h))
1322
    else:
1323
        h = h and "{}h ".format(str(h)) or ""
1324
    return "".join([d, h, m]).strip()
1325
1326
1327
def to_int(value, default=_marker):
1328
    """Tries to convert the value to int.
1329
    Truncates at the decimal point if the value is a float
1330
1331
    :param value: The value to be converted to an int
1332
    :return: The resulting int or default
1333
    """
1334
    if is_floatable(value):
1335
        value = to_float(value)
1336
    try:
1337
        return int(value)
1338
    except (TypeError, ValueError):
1339
        if default is None:
1340
            return default
1341
        if default is not _marker:
1342
            return to_int(default)
1343
        fail("Value %s cannot be converted to int" % repr(value))
1344
1345
1346
def is_floatable(value):
1347
    """Checks if the passed in value is a valid floatable number
1348
1349
    :param value: The value to be evaluated as a float number
1350
    :type value: str, float, int
1351
    :returns: True if is a valid float number
1352
    :rtype: bool"""
1353
    try:
1354
        float(value)
1355
        return True
1356
    except (TypeError, ValueError):
1357
        return False
1358
1359
1360
def to_float(value, default=_marker):
1361
    """Converts the passed in value to a float number
1362
1363
    :param value: The value to be converted to a floatable number
1364
    :type value: str, float, int
1365
    :returns: The float number representation of the passed in value
1366
    :rtype: float
1367
    """
1368
    if not is_floatable(value):
1369
        if default is not _marker:
1370
            return to_float(default)
1371
        fail("Value %s is not floatable" % repr(value))
1372
    return float(value)
1373
1374
1375
def to_searchable_text_metadata(value):
1376
    """Parse the given metadata value to searchable text
1377
1378
    :param value: The raw value of the metadata column
1379
    :returns: Searchable and translated unicode value or None
1380
    """
1381
    if not value:
1382
        return u""
1383
    if value is Missing.Value:
1384
        return u""
1385
    if is_uid(value):
1386
        return u""
1387
    if isinstance(value, (bool)):
1388
        return u""
1389
    if isinstance(value, (list, tuple)):
1390
        values = map(to_searchable_text_metadata, value)
1391
        values = filter(None, values)
1392
        return " ".join(values)
1393
    if isinstance(value, dict):
1394
        return to_searchable_text_metadata(value.values())
1395
    if is_date(value):
1396
        return value.strftime("%Y-%m-%d")
1397
    if is_at_content(value):
1398
        return to_searchable_text_metadata(get_title(value))
1399
    if not isinstance(value, six.string_types):
1400
        value = str(value)
1401
    return safe_unicode(value)
1402
1403
1404
def get_registry_record(name, default=None):
1405
    """Returns the value of a registry record
1406
1407
    :param name: [required] name of the registry record
1408
    :type name: str
1409
    :param default: The value returned if the record is not found
1410
    :type default: anything
1411
    :returns: value of the registry record
1412
    """
1413
    return ploneapi.portal.get_registry_record(name, default=default)
1414
1415
1416
def to_display_list(pairs, sort_by="key", allow_empty=True):
1417
    """Create a Plone DisplayList from list items
1418
1419
    :param pairs: list of key, value pairs
1420
    :param sort_by: Sort the items either by key or value
1421
    :param allow_empty: Allow to select an empty value
1422
    :returns: Plone DisplayList
1423
    """
1424
    dl = DisplayList()
1425
1426
    if isinstance(pairs, six.string_types):
1427
        pairs = [pairs, pairs]
1428
    for pair in pairs:
1429
        # pairs is a list of lists -> add each pair
1430
        if isinstance(pair, (tuple, list)):
1431
            dl.add(*pair)
1432
        # pairs is just a single pair -> add it and stop
1433
        if isinstance(pair, six.string_types):
1434
            dl.add(*pairs)
1435
            break
1436
1437
    # add the empty option
1438
    if allow_empty:
1439
        dl.add("", "")
1440
1441
    # sort by key/value
1442
    if sort_by == "key":
1443
        dl = dl.sortedByKey()
1444
    elif sort_by == "value":
1445
        dl = dl.sortedByValue()
1446
1447
    return dl
1448
1449
1450
def text_to_html(text, wrap="p", encoding="utf8"):
1451
    """Convert `\n` sequences in the text to HTML `\n`
1452
1453
    :param text: Plain text to convert
1454
    :param wrap: Toggle to wrap the text in a
1455
    :returns: HTML converted and encoded text
1456
    """
1457
    if not text:
1458
        return ""
1459
    # handle text internally as unicode
1460
    text = safe_unicode(text)
1461
    # replace newline characters with HTML entities
1462
    html = text.replace("\n", "<br/>")
1463
    if wrap:
1464
        html = u"<{tag}>{html}</{tag}>".format(
1465
            tag=wrap, html=html)
1466
    # return encoded html
1467
    return html.encode(encoding)
1468
1469
1470
def to_utf8(string, default=_marker):
1471
    """Encode string to UTF8
1472
1473
    :param string: String to be encoded to UTF8
1474
    :returns: UTF8 encoded string
1475
    """
1476
    if not isinstance(string, six.string_types):
1477
        if default is _marker:
1478
            fail("Expected string type, got '%s'" % type(string))
1479
        return default
1480
    return safe_unicode(string).encode("utf8")
1481
1482
1483
def is_temporary(obj):
1484
    """Returns whether the given object is temporary or not
1485
1486
    :param obj: the object to evaluate
1487
    :returns: True if the object is temporary
1488
    """
1489
    if UID_RX.match(obj.id):
1490
        return True
1491
1492
    parent = aq_parent(aq_inner(obj))
1493
    if not parent:
1494
        return True
1495
1496
    if UID_RX.match(parent.id):
1497
        return True
1498
1499
    if is_at_content(obj):
1500
        # Checks to see if we are created inside the portal_factory. We don't
1501
        # rely here on AT's isFactoryContained because the function is patched
1502
        meta_type = getattr(aq_base(parent), "meta_type", "")
1503
        return meta_type == "TempFolder"
1504
1505
    return False
1506