Passed
Push — 2.x ( 513331...6cc1a4 )
by Ramon
06:17 queued 51s
created

bika.lims.api.copy_object()   F

Complexity

Conditions 15

Size

Total Lines 85
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 50
dl 0
loc 85
rs 2.9998
c 0
b 0
f 0
cc 15
nop 5

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like bika.lims.api.copy_object() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
import re
23
from collections import OrderedDict
24
from datetime import datetime
25
from datetime import timedelta
26
from itertools import groupby
27
28
import six
29
30
import Missing
31
from AccessControl.PermissionRole import rolesForPermissionOn
32
from Acquisition import aq_base
33
from Acquisition import aq_inner
34
from Acquisition import aq_parent
35
from bika.lims import logger
36
from bika.lims.interfaces import IClient
37
from bika.lims.interfaces import IContact
38
from bika.lims.interfaces import ILabContact
39
from DateTime import DateTime
40
from DateTime.interfaces import DateTimeError
41
from plone import api as ploneapi
42
from plone.api.exc import InvalidParameterError
43
from plone.app.layout.viewlets.content import ContentHistoryView
44
from plone.behavior.interfaces import IBehaviorAssignable
45
from plone.dexterity.interfaces import IDexterityContent
46
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
47
from plone.i18n.normalizer.interfaces import IIDNormalizer
48
from plone.memoize.volatile import DontCache
49
from Products.Archetypes.atapi import DisplayList
50
from Products.Archetypes.BaseObject import BaseObject
51
from Products.Archetypes.event import ObjectInitializedEvent
52
from Products.Archetypes.utils import mapply
53
from Products.CMFCore.interfaces import IFolderish
54
from Products.CMFCore.interfaces import ISiteRoot
55
from Products.CMFCore.permissions import ModifyPortalContent
56
from Products.CMFCore.permissions import View
57
from Products.CMFCore.utils import getToolByName
58
from Products.CMFCore.WorkflowCore import WorkflowException
59
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
60
from Products.CMFPlone.utils import _createObjectByType
61
from Products.CMFPlone.utils import base_hasattr
62
from Products.CMFPlone.utils import safe_unicode
63
from Products.PlonePAS.tools.memberdata import MemberData
64
from Products.ZCatalog.interfaces import ICatalogBrain
65
from zope import globalrequest
66
from zope.annotation.interfaces import IAttributeAnnotatable
67
from zope.component import getUtility
68
from zope.component import queryMultiAdapter
69
from zope.component.interfaces import IFactory
70
from zope.event import notify
71
from zope.interface import directlyProvides
72
from zope.lifecycleevent import ObjectCreatedEvent
73
from zope.publisher.browser import TestRequest
74
from zope.schema import getFieldsInOrder
75
from zope.security.interfaces import Unauthorized
76
77
"""SENAITE LIMS Framework API
78
79
Please see bika.lims/docs/API.rst for documentation.
80
81
Architecural Notes:
82
83
Please add only functions that do a single thing for a single object.
84
85
Good: `def get_foo(brain_or_object)`
86
Bad:  `def get_foos(list_of_brain_objects)`
87
88
Why?
89
90
Because it makes things more complex. You can always use a pattern like this to
91
achieve the same::
92
93
    >>> foos = map(get_foo, list_of_brain_objects)
94
95
Please add for all of your functions a descriptive test in docs/API.rst.
96
97
Thanks.
98
"""
99
100
_marker = object()
101
102
UID_RX = re.compile("[a-z0-9]{32}$")
103
104
105
class APIError(Exception):
106
    """Base exception class for bika.lims errors."""
107
108
109
def get_portal():
110
    """Get the portal object
111
112
    :returns: Portal object
113
    """
114
    return ploneapi.portal.getSite()
115
116
117
def get_setup():
118
    """Fetch the `bika_setup` folder.
119
    """
120
    portal = get_portal()
121
    return portal.get("bika_setup")
122
123
124
def get_bika_setup():
125
    """Fetch the `bika_setup` folder.
126
    """
127
    return get_setup()
128
129
130
def get_senaite_setup():
131
    """Fetch the new DX `setup` folder.
132
    """
133
    portal = get_portal()
134
    return portal.get("setup")
135
136
137
def create(container, portal_type, *args, **kwargs):
138
    """Creates an object in Bika LIMS
139
140
    This code uses most of the parts from the TypesTool
141
    see: `Products.CMFCore.TypesTool._constructInstance`
142
143
    :param container: container
144
    :type container: ATContentType/DexterityContentType/CatalogBrain
145
    :param portal_type: The portal type to create, e.g. "Client"
146
    :type portal_type: string
147
    :param title: The title for the new content object
148
    :type title: string
149
    :returns: The new created object
150
    """
151
    from bika.lims.utils import tmpID
152
153
    id = kwargs.pop("id", tmpID())
154
    title = kwargs.pop("title", "New {}".format(portal_type))
155
156
    # get the fti
157
    types_tool = get_tool("portal_types")
158
    fti = types_tool.getTypeInfo(portal_type)
159
160
    if fti.product:
161
        # create the AT object
162
        obj = _createObjectByType(portal_type, container, id)
163
        # update the object with values
164
        edit(obj, check_permissions=False, title=title, **kwargs)
165
        # auto-id if required
166
        if obj._at_rename_after_creation:
167
            obj._renameAfterCreation(check_auto_id=True)
168
        # we are no longer under creation
169
        obj.unmarkCreationFlag()
170
        # notify that the object was created
171
        notify(ObjectInitializedEvent(obj))
172
    else:
173
        # newstyle factory
174
        factory = getUtility(IFactory, fti.factory)
175
        obj = factory(id, *args, **kwargs)
176
        if hasattr(obj, '_setPortalTypeName'):
177
            obj._setPortalTypeName(fti.getId())
178
        # set the title
179
        obj.title = safe_unicode(title)
180
        # notify that the object was created
181
        notify(ObjectCreatedEvent(obj))
182
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and
183
        # ContainerModifiedEvent
184
        container._setObject(id, obj)
185
        # we get the object here with the current object id, as it might be
186
        # renamed already by an event handler
187
        obj = container._getOb(obj.getId())
188
189
    return obj
190
191
192
def copy_object(source, container=None, portal_type=None, *args, **kwargs):
193
    """Creates a copy of the source object. If container is None, creates the
194
    copy inside the same container as the source. If portal_type is specified,
195
    creates a new object of this type, and copies the values from source fields
196
    to the destination object. Field values sent as kwargs have priority over
197
    the field values from source.
198
199
    :param source: object from which create a copy
200
    :type source: ATContentType/DexterityContentType/CatalogBrain
201
    :param container: destination container
202
    :type container: ATContentType/DexterityContentType/CatalogBrain
203
    :param portal_type: destination portal type
204
    :returns: The new created object
205
    """
206
    # Prevent circular dependencies
207
    from security import check_permission
208
    # Use same container as source unless explicitly set
209
    source = get_object(source)
210
    if not container:
211
        container = get_parent(source)
212
213
    # Use same portal type as source unless explicitly set
214
    if not portal_type:
215
        portal_type = get_portal_type(source)
216
217
    # Extend the fields to skip with defaults
218
    skip = kwargs.pop("skip", [])
219
    skip = set(skip)
220
    skip.update([
221
        "Products.Archetypes.Field.ComputedField",
222
        "UID",
223
        "id",
224
        "allowDiscussion",
225
        "contributors",
226
        "creation_date",
227
        "creators",
228
        "effectiveDate",
229
        "expirationDate",
230
        "language",
231
        "location",
232
        "modification_date",
233
        "rights",
234
        "subject",
235
    ])
236
    # Build a dict for complexity reduction
237
    skip = dict([(item, True) for item in skip])
238
239
    # Update kwargs with the field values to copy from source
240
    fields = get_fields(source)
241
    for field_name, field in fields.items():
242
        # Prioritize field values passed as kwargs
243
        if field_name in kwargs:
244
            continue
245
        # Skip framework internal fields by name
246
        if skip.get(field_name, False):
247
            continue
248
        # Skip fields of non-suitable types
249
        if hasattr(field, "getType") and skip.get(field.getType(), False):
250
            continue
251
        # Skip readonly fields
252
        if getattr(field, "readonly", False):
253
            continue
254
        # Skip non-readable fields
255
        perm = getattr(field, "read_permission", View)
256
        if perm and not check_permission(perm, source):
257
            continue
258
259
        # do not wake-up objects unnecessarily
260
        if hasattr(field, "getRaw"):
261
            field_value = field.getRaw(source)
262
        elif hasattr(field, "get_raw"):
263
            field_value = field.get_raw(source)
264
        elif hasattr(field, "getAccessor"):
265
            accessor = field.getAccessor(source)
266
            field_value = accessor()
267
        else:
268
            field_value = field.get(source)
269
270
        # Do a hard copy of value if mutable type
271
        if isinstance(field_value, (list, dict, set)):
272
            field_value = copy.deepcopy(field_value)
273
        kwargs.update({field_name: field_value})
274
275
    # Create a copy
276
    return create(container, portal_type, *args, **kwargs)
277
278
279
def edit(obj, check_permissions=True, **kwargs):
280
    """Updates the values of object fields with the new values passed-in
281
    """
282
    # Prevent circular dependencies
283
    from security import check_permission
284
    fields = get_fields(obj)
285
    for name, value in kwargs.items():
286
        field = fields.get(name, None)
287
        if not field:
288
            continue
289
290
        # cannot update readonly fields
291
        readonly = getattr(field, "readonly", False)
292
        if readonly:
293
            raise ValueError("Field '{}' is readonly".format(name))
294
295
        # check field writable permission
296
        if check_permissions:
297
            perm = getattr(field, "write_permission", ModifyPortalContent)
298
            if perm and not check_permission(perm, obj):
299
                raise Unauthorized("Field '{}' is not writeable".format(name))
300
301
        # Set the value
302
        if hasattr(field, "getMutator"):
303
            mutator = field.getMutator(obj)
304
            mapply(mutator, value)
305
        else:
306
            field.set(obj, value)
307
308
309
def get_tool(name, context=None, default=_marker):
310
    """Get a portal tool by name
311
312
    :param name: The name of the tool, e.g. `portal_catalog`
313
    :type name: string
314
    :param context: A portal object
315
    :type context: ATContentType/DexterityContentType/CatalogBrain
316
    :returns: Portal Tool
317
    """
318
319
    # Try first with the context
320
    if context is not None:
321
        try:
322
            context = get_object(context)
323
            return getToolByName(context, name)
324
        except (APIError, AttributeError) as e:
325
            # https://github.com/senaite/bika.lims/issues/396
326
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
327
                        "-> falling back to plone.api.portal.get_tool('{}')"
328
                        .format(repr(context), name, repr(e), name))
329
            return get_tool(name, default=default)
330
331
    # Try with the plone api
332
    try:
333
        return ploneapi.portal.get_tool(name)
334
    except InvalidParameterError:
335
        if default is not _marker:
336
            if isinstance(default, six.string_types):
337
                return get_tool(default)
338
            return default
339
        fail("No tool named '%s' found." % name)
340
341
342
def fail(msg=None):
343
    """API LIMS Error
344
    """
345
    if msg is None:
346
        msg = "Reason not given."
347
    raise APIError("{}".format(msg))
348
349
350
def is_object(brain_or_object):
351
    """Check if the passed in object is a supported portal content object
352
353
    :param brain_or_object: A single catalog brain or content object
354
    :type brain_or_object: Portal Object
355
    :returns: True if the passed in object is a valid portal content
356
    """
357
    if is_portal(brain_or_object):
358
        return True
359
    if is_supermodel(brain_or_object):
360
        return True
361
    if is_at_content(brain_or_object):
362
        return True
363
    if is_dexterity_content(brain_or_object):
364
        return True
365
    if is_brain(brain_or_object):
366
        return True
367
    return False
368
369
370
def get_object(brain_object_uid, default=_marker):
371
    """Get the full content object
372
373
    :param brain_object_uid: A catalog brain or content object or uid
374
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
375
    /CatalogBrain/basestring
376
    :returns: The full object
377
    """
378
    if is_uid(brain_object_uid):
379
        return get_object_by_uid(brain_object_uid)
380
    elif is_supermodel(brain_object_uid):
381
        return brain_object_uid.instance
382
    if not is_object(brain_object_uid):
383
        if default is _marker:
384
            fail("{} is not supported.".format(repr(brain_object_uid)))
385
        return default
386
    if is_brain(brain_object_uid):
387
        return brain_object_uid.getObject()
388
    return brain_object_uid
389
390
391
def is_portal(brain_or_object):
392
    """Checks if the passed in object is the portal root object
393
394
    :param brain_or_object: A single catalog brain or content object
395
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
396
    :returns: True if the object is the portal root object
397
    :rtype: bool
398
    """
399
    return ISiteRoot.providedBy(brain_or_object)
400
401
402
def is_brain(brain_or_object):
403
    """Checks if the passed in object is a portal catalog brain
404
405
    :param brain_or_object: A single catalog brain or content object
406
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
407
    :returns: True if the object is a catalog brain
408
    :rtype: bool
409
    """
410
    return ICatalogBrain.providedBy(brain_or_object)
411
412
413
def is_supermodel(brain_or_object):
414
    """Checks if the passed in object is a supermodel
415
416
    :param brain_or_object: A single catalog brain or content object
417
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
418
    :returns: True if the object is a catalog brain
419
    :rtype: bool
420
    """
421
    # avoid circular imports
422
    from senaite.app.supermodel.interfaces import ISuperModel
423
    return ISuperModel.providedBy(brain_or_object)
424
425
426
def is_dexterity_content(brain_or_object):
427
    """Checks if the passed in object is a dexterity content type
428
429
    :param brain_or_object: A single catalog brain or content object
430
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
431
    :returns: True if the object is a dexterity content type
432
    :rtype: bool
433
    """
434
    return IDexterityContent.providedBy(brain_or_object)
435
436
437
def is_at_content(brain_or_object):
438
    """Checks if the passed in object is an AT content type
439
440
    :param brain_or_object: A single catalog brain or content object
441
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
442
    :returns: True if the object is an AT content type
443
    :rtype: bool
444
    """
445
    return isinstance(brain_or_object, BaseObject)
446
447
448
def is_folderish(brain_or_object):
449
    """Checks if the passed in object is folderish
450
451
    :param brain_or_object: A single catalog brain or content object
452
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
453
    :returns: True if the object is folderish
454
    :rtype: bool
455
    """
456
    if hasattr(brain_or_object, "is_folderish"):
457
        if callable(brain_or_object.is_folderish):
458
            return brain_or_object.is_folderish()
459
        return brain_or_object.is_folderish
460
    return IFolderish.providedBy(get_object(brain_or_object))
461
462
463
def get_portal_type(brain_or_object):
464
    """Get the portal type for this object
465
466
    :param brain_or_object: A single catalog brain or content object
467
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
468
    :returns: Portal type
469
    :rtype: string
470
    """
471
    if not is_object(brain_or_object):
472
        fail("{} is not supported.".format(repr(brain_or_object)))
473
    return brain_or_object.portal_type
474
475
476
def get_schema(brain_or_object):
477
    """Get the schema of the content
478
479
    :param brain_or_object: A single catalog brain or content object
480
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
481
    :returns: Schema object
482
    """
483
    obj = get_object(brain_or_object)
484
    if is_portal(obj):
485
        fail("get_schema can't return schema of portal root")
486
    if is_dexterity_content(obj):
487
        pt = get_tool("portal_types")
488
        fti = pt.getTypeInfo(obj.portal_type)
489
        return fti.lookupSchema()
490
    if is_at_content(obj):
491
        return obj.Schema()
492
    fail("{} has no Schema.".format(brain_or_object))
493
494
495
def get_fields(brain_or_object):
496
    """Get a name to field mapping of the object
497
498
    :param brain_or_object: A single catalog brain or content object
499
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
500
    :returns: Mapping of name -> field
501
    :rtype: OrderedDict
502
    """
503
    obj = get_object(brain_or_object)
504
    schema = get_schema(obj)
505
    if is_dexterity_content(obj):
506
        # get the fields directly provided by the interface
507
        fields = getFieldsInOrder(schema)
508
        # append the fields coming from behaviors
509
        behavior_assignable = IBehaviorAssignable(obj)
510
        if behavior_assignable:
511
            behaviors = behavior_assignable.enumerateBehaviors()
512
            for behavior in behaviors:
513
                fields.extend(getFieldsInOrder(behavior.interface))
514
        return OrderedDict(fields)
515
    return OrderedDict(schema)
516
517
518
def get_id(brain_or_object):
519
    """Get the Plone ID for this object
520
521
    :param brain_or_object: A single catalog brain or content object
522
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
523
    :returns: Plone ID
524
    :rtype: string
525
    """
526
    if is_brain(brain_or_object):
527
        if base_hasattr(brain_or_object, "getId"):
528
            return brain_or_object.getId
529
        if base_hasattr(brain_or_object, "id"):
530
            return brain_or_object.id
531
    return get_object(brain_or_object).getId()
532
533
534
def get_title(brain_or_object):
535
    """Get the Title for this object
536
537
    :param brain_or_object: A single catalog brain or content object
538
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
539
    :returns: Title
540
    :rtype: string
541
    """
542
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
543
        return brain_or_object.Title
544
    return get_object(brain_or_object).Title()
545
546
547
def get_description(brain_or_object):
548
    """Get the Title for this object
549
550
    :param brain_or_object: A single catalog brain or content object
551
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
552
    :returns: Title
553
    :rtype: string
554
    """
555
    if is_brain(brain_or_object) \
556
            and base_hasattr(brain_or_object, "Description"):
557
        return brain_or_object.Description
558
    return get_object(brain_or_object).Description()
559
560
561
def get_uid(brain_or_object):
562
    """Get the Plone UID for this object
563
564
    :param brain_or_object: A single catalog brain or content object or an UID
565
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
566
    :returns: Plone UID
567
    :rtype: string
568
    """
569
    if is_uid(brain_or_object):
570
        return brain_or_object
571
    if is_portal(brain_or_object):
572
        return '0'
573
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
574
        return brain_or_object.UID
575
    return get_object(brain_or_object).UID()
576
577
578
def get_url(brain_or_object):
579
    """Get the absolute URL for this object
580
581
    :param brain_or_object: A single catalog brain or content object
582
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
583
    :returns: Absolute URL
584
    :rtype: string
585
    """
586
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
587
        return brain_or_object.getURL()
588
    return get_object(brain_or_object).absolute_url()
589
590
591
def get_icon(brain_or_object, html_tag=True):
592
    """Get the icon of the content object
593
594
    :param brain_or_object: A single catalog brain or content object
595
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
596
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
597
    :type html_tag: bool
598
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
599
    :rtype: string
600
    """
601
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
602
    # work for Contents coming from other catalogs than the
603
    # `portal_catalog`
604
    portal_types = get_tool("portal_types")
605
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
606
    icon = fti.getIcon()
607
    if not icon:
608
        return ""
609
    url = "%s/%s" % (get_url(get_portal()), icon)
610
    if not html_tag:
611
        return url
612
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
613
        url=url, title=get_title(brain_or_object))
614
    return tag
615
616
617
def get_object_by_uid(uid, default=_marker):
618
    """Find an object by a given UID
619
620
    :param uid: The UID of the object to find
621
    :type uid: string
622
    :returns: Found Object or None
623
    """
624
625
    # nothing to do here
626
    if not uid:
627
        if default is not _marker:
628
            return default
629
        fail("get_object_by_uid requires UID as first argument; got {} instead"
630
             .format(uid))
631
632
    # we defined the portal object UID to be '0'::
633
    if uid == '0':
634
        return get_portal()
635
636
    brain = get_brain_by_uid(uid)
637
638
    if brain is None:
639
        if default is not _marker:
640
            return default
641
        fail("No object found for UID {}".format(uid))
642
643
    return get_object(brain)
644
645
646
def get_brain_by_uid(uid, default=None):
647
    """Query a brain by a given UID
648
649
    :param uid: The UID of the object to find
650
    :type uid: string
651
    :returns: ZCatalog brain or None
652
    """
653
    if not is_uid(uid):
654
        return default
655
656
    # we try to find the object with the UID catalog
657
    uc = get_tool("uid_catalog")
658
659
    # try to find the object with the reference catalog first
660
    brains = uc(UID=uid)
661
    if len(brains) != 1:
662
        return default
663
    return brains[0]
664
665
666
def get_object_by_path(path, default=_marker):
667
    """Find an object by a given physical path or absolute_url
668
669
    :param path: The physical path of the object to find
670
    :type path: string
671
    :returns: Found Object or None
672
    """
673
674
    # nothing to do here
675
    if not path:
676
        if default is not _marker:
677
            return default
678
        fail("get_object_by_path first argument must be a path; {} received"
679
             .format(path))
680
681
    portal = get_portal()
682
    portal_path = get_path(portal)
683
    portal_url = get_url(portal)
684
685
    # ensure we have a physical path
686
    if path.startswith(portal_url):
687
        request = get_request()
688
        path = "/".join(request.physicalPathFromURL(path))
689
690
    if not path.startswith(portal_path):
691
        if default is not _marker:
692
            return default
693
        fail("Not a physical path inside the portal.")
694
695
    if path == portal_path:
696
        return portal
697
698
    return portal.restrictedTraverse(path, default)
699
700
701
def get_path(brain_or_object):
702
    """Calculate the physical path of this object
703
704
    :param brain_or_object: A single catalog brain or content object
705
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
706
    :returns: Physical path of the object
707
    :rtype: string
708
    """
709
    if is_brain(brain_or_object):
710
        return brain_or_object.getPath()
711
    return "/".join(get_object(brain_or_object).getPhysicalPath())
712
713
714
def get_parent_path(brain_or_object):
715
    """Calculate the physical parent path of this object
716
717
    :param brain_or_object: A single catalog brain or content object
718
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
719
    :returns: Physical path of the parent object
720
    :rtype: string
721
    """
722
    if is_portal(brain_or_object):
723
        return get_path(get_portal())
724
    if is_brain(brain_or_object):
725
        path = get_path(brain_or_object)
726
        return path.rpartition("/")[0]
727
    return get_path(get_object(brain_or_object).aq_parent)
728
729
730
def get_parent(brain_or_object, catalog_search=False):
731
    """Locate the parent object of the content/catalog brain
732
733
    The `catalog_search` switch uses the `portal_catalog` to do a search return
734
    a brain instead of the full parent object. However, if the search returned
735
    no results, it falls back to return the full parent object.
736
737
    :param brain_or_object: A single catalog brain or content object
738
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
739
    :param catalog_search: Use a catalog query to find the parent object
740
    :type catalog_search: bool
741
    :returns: parent object
742
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
743
    """
744
745
    if is_portal(brain_or_object):
746
        return get_portal()
747
748
    # Do a catalog search and return the brain
749
    if catalog_search:
750
        parent_path = get_parent_path(brain_or_object)
751
752
        # parent is the portal object
753
        if parent_path == get_path(get_portal()):
754
            return get_portal()
755
756
        # get the catalog tool
757
        pc = get_portal_catalog()
758
759
        # query for the parent path
760
        results = pc(path={
761
            "query": parent_path,
762
            "depth": 0})
763
764
        # No results fallback: return the parent object
765
        if not results:
766
            return get_object(brain_or_object).aq_parent
767
768
        # return the brain
769
        return results[0]
770
771
    return get_object(brain_or_object).aq_parent
772
773
774
def search(query, catalog=_marker):
775
    """Search for objects.
776
777
    :param query: A suitable search query.
778
    :type query: dict
779
    :param catalog: A single catalog id or a list of catalog ids
780
    :type catalog: str/list
781
    :returns: Search results
782
    :rtype: List of ZCatalog brains
783
    """
784
785
    # query needs to be a dictionary
786
    if not isinstance(query, dict):
787
        fail("Catalog query needs to be a dictionary")
788
789
    # Portal types to query
790
    portal_types = query.get("portal_type", [])
791
    # We want the portal_type as a list
792
    if not isinstance(portal_types, (tuple, list)):
793
        portal_types = [portal_types]
794
795
    # The catalogs used for the query
796
    catalogs = []
797
798
    # The user did **not** specify a catalog
799
    if catalog is _marker:
800
        # Find the registered catalogs for the queried portal types
801
        for portal_type in portal_types:
802
            # Just get the first registered/default catalog
803
            catalogs.append(get_catalogs_for(
804
                portal_type, default="portal_catalog")[0])
805
    else:
806
        # User defined catalogs
807
        if isinstance(catalog, (list, tuple)):
808
            catalogs.extend(map(get_tool, catalog))
809
        else:
810
            catalogs.append(get_tool(catalog))
811
812
    # Cleanup: Avoid duplicate catalogs
813
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
814
815
    # We only support **single** catalog queries
816
    if len(catalogs) > 1:
817
        fail("Multi Catalog Queries are not supported!")
818
819
    return catalogs[0](query)
820
821
822
def safe_getattr(brain_or_object, attr, default=_marker):
823
    """Return the attribute value
824
825
    :param brain_or_object: A single catalog brain or content object
826
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
827
    :param attr: Attribute name
828
    :type attr: str
829
    :returns: Attribute value
830
    :rtype: obj
831
    """
832
    try:
833
        value = getattr(brain_or_object, attr, _marker)
834
        if value is _marker:
835
            if default is not _marker:
836
                return default
837
            fail("Attribute '{}' not found.".format(attr))
838
        if callable(value):
839
            return value()
840
        return value
841
    except Unauthorized:
842
        if default is not _marker:
843
            return default
844
        fail("You are not authorized to access '{}' of '{}'.".format(
845
            attr, repr(brain_or_object)))
846
847
848
def get_portal_catalog():
849
    """Get the portal catalog tool
850
851
    :returns: Portal Catalog Tool
852
    """
853
    return get_tool("portal_catalog")
854
855
856
def get_review_history(brain_or_object, rev=True):
857
    """Get the review history for the given brain or context.
858
859
    :param brain_or_object: A single catalog brain or content object
860
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
861
    :returns: Workflow history
862
    :rtype: [{}, ...]
863
    """
864
    obj = get_object(brain_or_object)
865
    review_history = []
866
    try:
867
        workflow = get_tool("portal_workflow")
868
        review_history = workflow.getInfoFor(obj, 'review_history')
869
    except WorkflowException as e:
870
        message = str(e)
871
        logger.error("Cannot retrieve review_history on {}: {}".format(
872
            obj, message))
873
    if not isinstance(review_history, (list, tuple)):
874
        logger.error("get_review_history: expected list, received {}".format(
875
            review_history))
876
        review_history = []
877
878
    if isinstance(review_history, tuple):
879
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
880
        # tuple when "review_history" is passed in:
881
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
882
        #
883
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
884
        # Expression.StateChangeInfo, that always returns a list, except when no
885
        # review_history is found:
886
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
887
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
888
        review_history = list(review_history)
889
890
    if rev is True:
891
        review_history.reverse()
892
    return review_history
893
894
895
def get_revision_history(brain_or_object):
896
    """Get the revision history for the given brain or context.
897
898
    :param brain_or_object: A single catalog brain or content object
899
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
900
    :returns: Workflow history
901
    :rtype: obj
902
    """
903
    obj = get_object(brain_or_object)
904
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
905
    return chv.fullHistory()
906
907
908
def get_workflows_for(brain_or_object):
909
    """Get the assigned workflows for the given brain or context.
910
911
    Note: This function supports also the portal_type as parameter.
912
913
    :param brain_or_object: A single catalog brain or content object
914
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
915
    :returns: Assigned Workflows
916
    :rtype: tuple
917
    """
918
    workflow = ploneapi.portal.get_tool("portal_workflow")
919
    if isinstance(brain_or_object, six.string_types):
920
        return workflow.getChainFor(brain_or_object)
921
    obj = get_object(brain_or_object)
922
    return workflow.getChainFor(obj)
923
924
925
def get_workflow_status_of(brain_or_object, state_var="review_state"):
926
    """Get the current workflow status of the given brain or context.
927
928
    :param brain_or_object: A single catalog brain or content object
929
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
930
    :param state_var: The name of the state variable
931
    :type state_var: string
932
    :returns: Status
933
    :rtype: str
934
    """
935
    # Try to get the state from the catalog brain first
936
    if is_brain(brain_or_object):
937
        if state_var in brain_or_object.schema():
938
            return brain_or_object[state_var]
939
940
    # Retrieve the sate from the object
941
    workflow = get_tool("portal_workflow")
942
    obj = get_object(brain_or_object)
943
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
944
945
946
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
947
    """Get the previous workflow status of the object
948
949
    :param brain_or_object: A single catalog brain or content object
950
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
951
    :param skip: Workflow states to skip
952
    :type skip: tuple/list
953
    :returns: status
954
    :rtype: str
955
    """
956
957
    skip = isinstance(skip, (list, tuple)) and skip or []
958
    history = get_review_history(brain_or_object)
959
960
    # Remove consecutive duplicates, some transitions might happen more than
961
    # once consecutively (e.g. publish)
962
    history = map(lambda i: i[0], groupby(history))
963
964
    for num, item in enumerate(history):
965
        # skip the current history entry
966
        if num == 0:
967
            continue
968
        status = item.get("review_state")
969
        if status in skip:
970
            continue
971
        return status
972
    return default
973
974
975
def get_creation_date(brain_or_object):
976
    """Get the creation date of the brain or object
977
978
    :param brain_or_object: A single catalog brain or content object
979
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
980
    :returns: Creation date
981
    :rtype: DateTime
982
    """
983
    created = getattr(brain_or_object, "created", None)
984
    if created is None:
985
        fail("Object {} has no creation date ".format(
986
             repr(brain_or_object)))
987
    if callable(created):
988
        return created()
989
    return created
990
991
992
def get_modification_date(brain_or_object):
993
    """Get the modification date of the brain or object
994
995
    :param brain_or_object: A single catalog brain or content object
996
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
997
    :returns: Modification date
998
    :rtype: DateTime
999
    """
1000
    modified = getattr(brain_or_object, "modified", None)
1001
    if modified is None:
1002
        fail("Object {} has no modification date ".format(
1003
             repr(brain_or_object)))
1004
    if callable(modified):
1005
        return modified()
1006
    return modified
1007
1008
1009
def get_review_status(brain_or_object):
1010
    """Get the `review_state` of an object
1011
1012
    :param brain_or_object: A single catalog brain or content object
1013
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1014
    :returns: Value of the review_status variable
1015
    :rtype: String
1016
    """
1017
    if is_brain(brain_or_object):
1018
        return brain_or_object.review_state
1019
    return get_workflow_status_of(brain_or_object, state_var="review_state")
1020
1021
1022
def is_active(brain_or_object):
1023
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
1024
1025
    :param brain_or_object: A single catalog brain or content object
1026
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1027
    :returns: False if the object is in the state 'inactive' or 'cancelled'
1028
    :rtype: bool
1029
    """
1030
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
1031
        return False
1032
    return True
1033
1034
1035
def get_catalogs_for(brain_or_object, default="portal_catalog"):
1036
    """Get all registered catalogs for the given portal_type, catalog brain or
1037
    content object
1038
1039
    :param brain_or_object: The portal_type, a catalog brain or content object
1040
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1041
    :returns: List of supported catalogs
1042
    :rtype: list
1043
    """
1044
    archetype_tool = get_tool("archetype_tool", default=None)
1045
    if archetype_tool is None:
1046
        # return the default catalog
1047
        return [get_tool(default, default="portal_catalog")]
1048
1049
    catalogs = []
1050
1051
    # get the registered catalogs for portal_type
1052
    if is_object(brain_or_object):
1053
        catalogs = archetype_tool.getCatalogsByType(
1054
            get_portal_type(brain_or_object))
1055
    if isinstance(brain_or_object, six.string_types):
1056
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
1057
1058
    if not catalogs:
1059
        return [get_tool(default)]
1060
    return catalogs
1061
1062
1063
def get_transitions_for(brain_or_object):
1064
    """List available workflow transitions for all workflows
1065
1066
    :param brain_or_object: A single catalog brain or content object
1067
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1068
    :returns: All possible available and allowed transitions
1069
    :rtype: list[dict]
1070
    """
1071
    workflow = get_tool('portal_workflow')
1072
    transitions = []
1073
    instance = get_object(brain_or_object)
1074
    for wfid in get_workflows_for(brain_or_object):
1075
        wf = workflow[wfid]
1076
        tlist = wf.getTransitionsFor(instance)
1077
        transitions.extend([t for t in tlist if t not in transitions])
1078
    return transitions
1079
1080
1081
def do_transition_for(brain_or_object, transition):
1082
    """Performs a workflow transition for the passed in object.
1083
1084
    :param brain_or_object: A single catalog brain or content object
1085
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1086
    :returns: The object where the transtion was performed
1087
    """
1088
    if not isinstance(transition, six.string_types):
1089
        fail("Transition type needs to be string, got '%s'" % type(transition))
1090
    obj = get_object(brain_or_object)
1091
    try:
1092
        ploneapi.content.transition(obj, transition)
1093
    except ploneapi.exc.InvalidParameterError as e:
1094
        fail("Failed to perform transition '{}' on {}: {}".format(
1095
             transition, obj, str(e)))
1096
    return obj
1097
1098
1099
def get_roles_for_permission(permission, brain_or_object):
1100
    """Get a list of granted roles for the given permission on the object.
1101
1102
    :param brain_or_object: A single catalog brain or content object
1103
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1104
    :returns: Roles for the given Permission
1105
    :rtype: list
1106
    """
1107
    obj = get_object(brain_or_object)
1108
    allowed = set(rolesForPermissionOn(permission, obj))
1109
    return sorted(allowed)
1110
1111
1112
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1113
    """Checks if the passed in object is versionable.
1114
1115
    :param brain_or_object: A single catalog brain or content object
1116
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1117
    :returns: True if the object is versionable
1118
    :rtype: bool
1119
    """
1120
    pr = get_tool("portal_repository")
1121
    obj = get_object(brain_or_object)
1122
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1123
        and pr.isVersionable(obj)
1124
1125
1126
def get_version(brain_or_object):
1127
    """Get the version of the current object
1128
1129
    :param brain_or_object: A single catalog brain or content object
1130
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1131
    :returns: The current version of the object, or None if not available
1132
    :rtype: int or None
1133
    """
1134
    obj = get_object(brain_or_object)
1135
    if not is_versionable(obj):
1136
        return None
1137
    return getattr(aq_base(obj), "version_id", 0)
1138
1139
1140
def get_view(name, context=None, request=None, default=None):
1141
    """Get the view by name
1142
1143
    :param name: The name of the view
1144
    :type name: str
1145
    :param context: The context to query the view
1146
    :type context: ATContentType/DexterityContentType/CatalogBrain
1147
    :param request: The request to query the view
1148
    :type request: HTTPRequest object
1149
    :returns: HTTP Request
1150
    :rtype: Products.Five.metaclass View object
1151
    """
1152
    context = context or get_portal()
1153
    request = request or get_request() or None
1154
    view = queryMultiAdapter((get_object(context), request), name=name)
1155
    if view is None:
1156
        return default
1157
    return view
1158
1159
1160
def get_request():
1161
    """Get the global request object
1162
1163
    :returns: HTTP Request
1164
    :rtype: HTTPRequest object
1165
    """
1166
    return globalrequest.getRequest()
1167
1168
1169
def get_test_request():
1170
    """Get the TestRequest object
1171
    """
1172
    request = TestRequest()
1173
    directlyProvides(request, IAttributeAnnotatable)
1174
    return request
1175
1176
1177
def get_group(group_or_groupname):
1178
    """Return Plone Group
1179
1180
    :param group_or_groupname: Plone group or the name of the group
1181
    :type groupname:  GroupData/str
1182
    :returns: Plone GroupData
1183
    """
1184
    if not group_or_groupname:
1185
1186
        return None
1187
    if hasattr(group_or_groupname, "_getGroup"):
1188
        return group_or_groupname
1189
    gtool = get_tool("portal_groups")
1190
    return gtool.getGroupById(group_or_groupname)
1191
1192
1193
def get_user(user_or_username):
1194
    """Return Plone User
1195
1196
    :param user_or_username: Plone user or user id
1197
    :returns: Plone MemberData
1198
    """
1199
    user = None
1200
    if isinstance(user_or_username, MemberData):
1201
        user = user_or_username
1202
    if isinstance(user_or_username, six.string_types):
1203
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1204
    return user
1205
1206
1207
def get_user_properties(user_or_username):
1208
    """Return User Properties
1209
1210
    :param user_or_username: Plone group identifier
1211
    :returns: Plone MemberData
1212
    """
1213
    user = get_user(user_or_username)
1214
    if user is None:
1215
        return {}
1216
    if not callable(user.getUser):
1217
        return {}
1218
    out = {}
1219
    plone_user = user.getUser()
1220
    for sheet in plone_user.listPropertysheets():
1221
        ps = plone_user.getPropertysheet(sheet)
1222
        out.update(dict(ps.propertyItems()))
1223
    return out
1224
1225
1226
def get_users_by_roles(roles=None):
1227
    """Search Plone users by their roles
1228
1229
    :param roles: Plone role name or list of roles
1230
    :type roles:  list/str
1231
    :returns: List of Plone users having the role(s)
1232
    """
1233
    if not isinstance(roles, (tuple, list)):
1234
        roles = [roles]
1235
    mtool = get_tool("portal_membership")
1236
    return mtool.searchForMembers(roles=roles)
1237
1238
1239
def get_current_user():
1240
    """Returns the current logged in user
1241
1242
    :returns: Current User
1243
    """
1244
    return ploneapi.user.get_current()
1245
1246
1247
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1248
    """Returns the associated contact of a Plone user
1249
1250
    If the user passed in has no contact associated, return None.
1251
    The `contact_types` parameter filter the portal types for the search.
1252
1253
    :param: Plone user
1254
    :contact_types: List with the contact portal types to search
1255
    :returns: Contact associated to the Plone user or None
1256
    """
1257
    if not user:
1258
        return None
1259
1260
    query = {'portal_type': contact_types, 'getUsername': user.id}
1261
    brains = search(query, catalog='portal_catalog')
1262
    if not brains:
1263
        return None
1264
1265
    if len(brains) > 1:
1266
        # Oops, the user has multiple contacts assigned, return None
1267
        contacts = map(lambda c: c.Title, brains)
1268
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1269
        err_msg = err_msg.format(user.id, ','.join(contacts))
1270
        logger.error(err_msg)
1271
        return None
1272
1273
    return get_object(brains[0])
1274
1275
1276
def get_user_client(user_or_contact):
1277
    """Returns the client of the contact of a Plone user
1278
1279
    If the user passed in has no contact or does not belong to any client,
1280
    returns None.
1281
1282
    :param: Plone user or contact
1283
    :returns: Client the contact of the Plone user belongs to
1284
    """
1285
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1286
        # Lab contacts cannot belong to a client
1287
        return None
1288
1289
    if not IContact.providedBy(user_or_contact):
1290
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1291
        if IContact.providedBy(contact):
1292
            return get_user_client(contact)
1293
        return None
1294
1295
    client = get_parent(user_or_contact)
1296
    if client and IClient.providedBy(client):
1297
        return client
1298
1299
    return None
1300
1301
1302
def get_current_client():
1303
    """Returns the current client the current logged in user belongs to, if any
1304
1305
    :returns: Client the current logged in user belongs to or None
1306
    """
1307
    return get_user_client(get_current_user())
1308
1309
1310
def get_cache_key(brain_or_object):
1311
    """Generate a cache key for a common brain or object
1312
1313
    :param brain_or_object: A single catalog brain or content object
1314
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1315
    :returns: Cache Key
1316
    :rtype: str
1317
    """
1318
    key = [
1319
        get_portal_type(brain_or_object),
1320
        get_id(brain_or_object),
1321
        get_uid(brain_or_object),
1322
        # handle different domains gracefully
1323
        get_url(brain_or_object),
1324
        # Return the microsecond since the epoch in GMT
1325
        get_modification_date(brain_or_object).micros(),
1326
    ]
1327
    return "-".join(map(lambda x: str(x), key))
1328
1329
1330
def bika_cache_key_decorator(method, self, brain_or_object):
1331
    """Bika cache key decorator usable for
1332
1333
    :param brain_or_object: A single catalog brain or content object
1334
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1335
    :returns: Cache Key
1336
    :rtype: str
1337
    """
1338
    if brain_or_object is None:
1339
        raise DontCache
1340
    return get_cache_key(brain_or_object)
1341
1342
1343
def normalize_id(string):
1344
    """Normalize the id
1345
1346
    :param string: A string to normalize
1347
    :type string: str
1348
    :returns: Normalized ID
1349
    :rtype: str
1350
    """
1351
    if not isinstance(string, six.string_types):
1352
        fail("Type of argument must be string, found '{}'"
1353
             .format(type(string)))
1354
    # get the id nomalizer utility
1355
    normalizer = getUtility(IIDNormalizer).normalize
1356
    return normalizer(string)
1357
1358
1359
def normalize_filename(string):
1360
    """Normalize the filename
1361
1362
    :param string: A string to normalize
1363
    :type string: str
1364
    :returns: Normalized ID
1365
    :rtype: str
1366
    """
1367
    if not isinstance(string, six.string_types):
1368
        fail("Type of argument must be string, found '{}'"
1369
             .format(type(string)))
1370
    # get the file nomalizer utility
1371
    normalizer = getUtility(IFileNameNormalizer).normalize
1372
    return normalizer(string)
1373
1374
1375
def is_uid(uid, validate=False):
1376
    """Checks if the passed in uid is a valid UID
1377
1378
    :param uid: The uid to check
1379
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1380
    True, also verifies if a brain exists for the uid passed in
1381
    :type uid: string
1382
    :return: True if a valid uid
1383
    :rtype: bool
1384
    """
1385
    if not isinstance(uid, six.string_types):
1386
        return False
1387
    if uid == '0':
1388
        return True
1389
    if len(uid) != 32:
1390
        return False
1391
    if not UID_RX.match(uid):
1392
        return False
1393
    if not validate:
1394
        return True
1395
1396
    # Check if a brain for this uid exists
1397
    uc = get_tool('uid_catalog')
1398
    brains = uc(UID=uid)
1399
    if brains:
1400
        assert (len(brains) == 1)
1401
    return len(brains) > 0
1402
1403
1404
def is_date(date):
1405
    """Checks if the passed in value is a valid Zope's DateTime
1406
1407
    :param date: The date to check
1408
    :type date: DateTime
1409
    :return: True if a valid date
1410
    :rtype: bool
1411
    """
1412
    if not date:
1413
        return False
1414
    return isinstance(date, (DateTime, datetime))
1415
1416
1417
def to_date(value, default=None):
1418
    """Tries to convert the passed in value to Zope's DateTime
1419
1420
    :param value: The value to be converted to a valid DateTime
1421
    :type value: str, DateTime or datetime
1422
    :return: The DateTime representation of the value passed in or default
1423
    """
1424
    if isinstance(value, DateTime):
1425
        return value
1426
    if not value:
1427
        if default is None:
1428
            return None
1429
        return to_date(default)
1430
    try:
1431
        if isinstance(value, str) and '.' in value:
1432
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1433
            return DateTime(value, datefmt='international')
1434
        return DateTime(value)
1435
    except (TypeError, ValueError, DateTimeError):
1436
        return to_date(default)
1437
1438
1439
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1440
               round_to_int=True):
1441
    """Returns the computed total number of minutes
1442
    """
1443
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1444
        float(seconds)/60 + float(milliseconds)/1000/60
1445
    return int(round(total)) if round_to_int else total
1446
1447
1448
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1449
    """Returns a representation of time in a string in xd yh zm format
1450
    """
1451
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1452
                         seconds=seconds, milliseconds=milliseconds)
1453
    delta = timedelta(minutes=int(round(minutes)))
1454
    d = delta.days
1455
    h = delta.seconds // 3600
1456
    m = (delta.seconds // 60) % 60
1457
    m = m and "{}m ".format(str(m)) or ""
1458
    d = d and "{}d ".format(str(d)) or ""
1459
    if m and d:
1460
        h = "{}h ".format(str(h))
1461
    else:
1462
        h = h and "{}h ".format(str(h)) or ""
1463
    return "".join([d, h, m]).strip()
1464
1465
1466
def to_int(value, default=_marker):
1467
    """Tries to convert the value to int.
1468
    Truncates at the decimal point if the value is a float
1469
1470
    :param value: The value to be converted to an int
1471
    :return: The resulting int or default
1472
    """
1473
    if is_floatable(value):
1474
        value = to_float(value)
1475
    try:
1476
        return int(value)
1477
    except (TypeError, ValueError):
1478
        if default is None:
1479
            return default
1480
        if default is not _marker:
1481
            return to_int(default)
1482
        fail("Value %s cannot be converted to int" % repr(value))
1483
1484
1485
def is_floatable(value):
1486
    """Checks if the passed in value is a valid floatable number
1487
1488
    :param value: The value to be evaluated as a float number
1489
    :type value: str, float, int
1490
    :returns: True if is a valid float number
1491
    :rtype: bool"""
1492
    try:
1493
        float(value)
1494
        return True
1495
    except (TypeError, ValueError):
1496
        return False
1497
1498
1499
def to_float(value, default=_marker):
1500
    """Converts the passed in value to a float number
1501
1502
    :param value: The value to be converted to a floatable number
1503
    :type value: str, float, int
1504
    :returns: The float number representation of the passed in value
1505
    :rtype: float
1506
    """
1507
    if not is_floatable(value):
1508
        if default is not _marker:
1509
            return to_float(default)
1510
        fail("Value %s is not floatable" % repr(value))
1511
    return float(value)
1512
1513
1514
def float_to_string(value, default=_marker):
1515
    """Convert a float value to string without exponential notation
1516
1517
    This function preserves the whole fraction
1518
1519
    :param value: The float value to be converted to a string
1520
    :type value: str, float, int
1521
    :returns: String representation of the float w/o exponential notation
1522
    :rtype: str
1523
    """
1524
    if not is_floatable(value):
1525
        if default is not _marker:
1526
            return default
1527
        fail("Value %s is not floatable" % repr(value))
1528
1529
    # Leave floatable string values unchanged
1530
    if isinstance(value, six.string_types):
1531
        return value
1532
1533
    value = float(value)
1534
    str_value = str(value)
1535
1536
    if "." in str_value:
1537
        # might be something like 1.23e-26
1538
        front, back = str_value.split(".")
1539
    else:
1540
        # or 1e-07 for 0.0000001
1541
        back = str_value
1542
1543
    if "e-" in back:
1544
        fraction, zeros = back.split("e-")
1545
        # we want to cover the faction and the zeros
1546
        precision = len(fraction) + int(zeros)
1547
        template = "{:.%df}" % precision
1548
        str_value = template.format(value)
1549
    elif "e+" in back:
1550
        # positive numbers, e.g. 1e+16 don't need a fractional part
1551
        str_value = "{:.0f}".format(value)
1552
1553
    # cut off trailing zeros
1554
    if "." in str_value:
1555
        str_value = str_value.rstrip("0").rstrip(".")
1556
1557
    return str_value
1558
1559
1560
def to_searchable_text_metadata(value):
1561
    """Parse the given metadata value to searchable text
1562
1563
    :param value: The raw value of the metadata column
1564
    :returns: Searchable and translated unicode value or None
1565
    """
1566
    if not value:
1567
        return u""
1568
    if value is Missing.Value:
1569
        return u""
1570
    if is_uid(value):
1571
        return u""
1572
    if isinstance(value, (bool)):
1573
        return u""
1574
    if isinstance(value, (list, tuple)):
1575
        values = map(to_searchable_text_metadata, value)
1576
        values = filter(None, values)
1577
        return " ".join(values)
1578
    if isinstance(value, dict):
1579
        return to_searchable_text_metadata(value.values())
1580
    if is_date(value):
1581
        from senaite.core.api.dtime import date_to_string
1582
        return date_to_string(value, "%Y-%m-%d")
1583
    if is_at_content(value):
1584
        return to_searchable_text_metadata(get_title(value))
1585
    if not isinstance(value, six.string_types):
1586
        value = str(value)
1587
    return safe_unicode(value)
1588
1589
1590
def get_registry_record(name, default=None):
1591
    """Returns the value of a registry record
1592
1593
    :param name: [required] name of the registry record
1594
    :type name: str
1595
    :param default: The value returned if the record is not found
1596
    :type default: anything
1597
    :returns: value of the registry record
1598
    """
1599
    return ploneapi.portal.get_registry_record(name, default=default)
1600
1601
1602
def to_display_list(pairs, sort_by="key", allow_empty=True):
1603
    """Create a Plone DisplayList from list items
1604
1605
    :param pairs: list of key, value pairs
1606
    :param sort_by: Sort the items either by key or value
1607
    :param allow_empty: Allow to select an empty value
1608
    :returns: Plone DisplayList
1609
    """
1610
    dl = DisplayList()
1611
1612
    if isinstance(pairs, six.string_types):
1613
        pairs = [pairs, pairs]
1614
    for pair in pairs:
1615
        # pairs is a list of lists -> add each pair
1616
        if isinstance(pair, (tuple, list)):
1617
            dl.add(*pair)
1618
        # pairs is just a single pair -> add it and stop
1619
        if isinstance(pair, six.string_types):
1620
            dl.add(*pairs)
1621
            break
1622
1623
    # add the empty option
1624
    if allow_empty:
1625
        dl.add("", "")
1626
1627
    # sort by key/value
1628
    if sort_by == "key":
1629
        dl = dl.sortedByKey()
1630
    elif sort_by == "value":
1631
        dl = dl.sortedByValue()
1632
1633
    return dl
1634
1635
1636
def text_to_html(text, wrap="p", encoding="utf8"):
1637
    """Convert `\n` sequences in the text to HTML `\n`
1638
1639
    :param text: Plain text to convert
1640
    :param wrap: Toggle to wrap the text in a
1641
    :returns: HTML converted and encoded text
1642
    """
1643
    if not text:
1644
        return ""
1645
    # handle text internally as unicode
1646
    text = safe_unicode(text)
1647
    # replace newline characters with HTML entities
1648
    html = text.replace("\n", "<br/>")
1649
    if wrap:
1650
        html = u"<{tag}>{html}</{tag}>".format(
1651
            tag=wrap, html=html)
1652
    # return encoded html
1653
    return html.encode(encoding)
1654
1655
1656
def to_utf8(string, default=_marker):
1657
    """Encode string to UTF8
1658
1659
    :param string: String to be encoded to UTF8
1660
    :returns: UTF8 encoded string
1661
    """
1662
    if not isinstance(string, six.string_types):
1663
        if default is _marker:
1664
            fail("Expected string type, got '%s'" % type(string))
1665
        return default
1666
    return safe_unicode(string).encode("utf8")
1667
1668
1669
def is_temporary(obj):
1670
    """Returns whether the given object is temporary or not
1671
1672
    :param obj: the object to evaluate
1673
    :returns: True if the object is temporary
1674
    """
1675
    if UID_RX.match(obj.id):
1676
        return True
1677
1678
    parent = aq_parent(aq_inner(obj))
1679
    if not parent:
1680
        return True
1681
1682
    if UID_RX.match(parent.id):
1683
        return True
1684
1685
    if is_at_content(obj):
1686
        # Checks to see if we are created inside the portal_factory. We don't
1687
        # rely here on AT's isFactoryContained because the function is patched
1688
        meta_type = getattr(aq_base(parent), "meta_type", "")
1689
        return meta_type == "TempFolder"
1690
1691
    return False
1692