Passed
Push — 2.x ( cc9902...4648d7 )
by Ramon
06:12
created

bika.lims.api.to_date()   A

Complexity

Conditions 2

Size

Total Lines 21
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 21
rs 9.85
c 0
b 0
f 0
cc 2
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
import json
23
import re
24
from collections import OrderedDict
25
from datetime import datetime
26
from datetime import timedelta
27
from itertools import groupby
28
29
import Missing
30
import six
31
from AccessControl.PermissionRole import rolesForPermissionOn
32
from Acquisition import aq_base
33
from Acquisition import aq_inner
34
from Acquisition import aq_parent
35
from bika.lims import logger
36
from bika.lims.interfaces import IClient
37
from bika.lims.interfaces import IContact
38
from bika.lims.interfaces import ILabContact
39
from DateTime import DateTime
40
from plone import api as ploneapi
41
from plone.api.exc import InvalidParameterError
42
from plone.app.layout.viewlets.content import ContentHistoryView
43
from plone.behavior.interfaces import IBehaviorAssignable
44
from plone.dexterity.interfaces import IDexterityContent
45
from plone.dexterity.schema import SchemaInvalidatedEvent
46
from plone.dexterity.utils import addContentToContainer
47
from plone.dexterity.utils import createContent
48
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
49
from plone.i18n.normalizer.interfaces import IIDNormalizer
50
from plone.memoize.volatile import DontCache
51
from Products.Archetypes.atapi import DisplayList
52
from Products.Archetypes.BaseObject import BaseObject
53
from Products.Archetypes.event import ObjectInitializedEvent
54
from Products.Archetypes.utils import mapply
55
from Products.CMFCore.interfaces import IFolderish
56
from Products.CMFCore.interfaces import ISiteRoot
57
from Products.CMFCore.permissions import ModifyPortalContent
58
from Products.CMFCore.permissions import View
59
from Products.CMFCore.utils import getToolByName
60
from Products.CMFCore.WorkflowCore import WorkflowException
61
from Products.CMFPlone.RegistrationTool import get_member_by_login_name
62
from Products.CMFPlone.utils import _createObjectByType
63
from Products.CMFPlone.utils import base_hasattr
64
from Products.CMFPlone.utils import safe_unicode
65
from Products.PlonePAS.tools.memberdata import MemberData
66
from Products.ZCatalog.interfaces import ICatalogBrain
67
from senaite.core.interfaces import ITemporaryObject
68
from zope import globalrequest
69
from zope.annotation.interfaces import IAttributeAnnotatable
70
from zope.component import getUtility
71
from zope.component import queryMultiAdapter
72
from zope.event import notify
73
from zope.interface import alsoProvides
74
from zope.interface import directlyProvides
75
from zope.interface import noLongerProvides
76
from zope.publisher.browser import TestRequest
77
from zope.schema import getFieldsInOrder
78
from zope.security.interfaces import Unauthorized
79
80
"""SENAITE LIMS Framework API
81
82
Please see tests/doctests/API.rst for documentation.
83
84
Architecural Notes:
85
86
Please add only functions that do a single thing for a single object.
87
88
Good: `def get_foo(brain_or_object)`
89
Bad:  `def get_foos(list_of_brain_objects)`
90
91
Why?
92
93
Because it makes things more complex. You can always use a pattern like this to
94
achieve the same::
95
96
    >>> foos = map(get_foo, list_of_brain_objects)
97
98
Please add for all functions a descriptive test in tests/doctests/API.rst.
99
100
Thanks.
101
"""
102
103
_marker = object()
104
105
UID_RX = re.compile("[a-z0-9]{32}$")
106
107
UID_CATALOG = "uid_catalog"
108
PORTAL_CATALOG = "portal_catalog"
109
110
111
class APIError(Exception):
112
    """Base exception class for bika.lims errors."""
113
114
115
def get_portal():
116
    """Get the portal object
117
118
    :returns: Portal object
119
    """
120
    return ploneapi.portal.getSite()
121
122
123
def get_setup():
124
    """Fetch the `bika_setup` folder.
125
    """
126
    portal = get_portal()
127
    return portal.get("bika_setup")
128
129
130
def get_bika_setup():
131
    """Fetch the `bika_setup` folder.
132
    """
133
    return get_setup()
134
135
136
def get_senaite_setup():
137
    """Fetch the new DX `setup` folder.
138
    """
139
    portal = get_portal()
140
    return portal.get("setup")
141
142
143
def create(container, portal_type, *args, **kwargs):
144
    """Creates an object in Bika LIMS
145
146
    This code uses most of the parts from the TypesTool
147
    see: `Products.CMFCore.TypesTool._constructInstance`
148
149
    :param container: container
150
    :type container: ATContentType/DexterityContentType/CatalogBrain
151
    :param portal_type: The portal type to create, e.g. "Client"
152
    :type portal_type: string
153
    :param title: The title for the new content object
154
    :type title: string
155
    :returns: The new created object
156
    """
157
    from bika.lims.utils import tmpID
158
159
    tmp_id = tmpID()
160
    id = kwargs.pop("id", "")
161
    title = kwargs.pop("title", "")
162
163
    # get the fti
164
    types_tool = get_tool("portal_types")
165
    fti = types_tool.getTypeInfo(portal_type)
166
167
    if fti.product:
168
        # create the AT object
169
        obj = _createObjectByType(portal_type, container, id or tmp_id)
170
        # update the object with values
171
        edit(obj, check_permissions=False, title=title, **kwargs)
172
        # auto-id if required
173
        if obj._at_rename_after_creation:
174
            obj._renameAfterCreation(check_auto_id=True)
175
        # we are no longer under creation
176
        obj.unmarkCreationFlag()
177
        # notify that the object was created
178
        notify(ObjectInitializedEvent(obj))
179
    else:
180
        content = createContent(portal_type, **kwargs)
181
        content.id = id
182
        content.title = title
183
        obj = addContentToContainer(container, content)
184
185
    return obj
186
187
188
def copy_object(source, container=None, portal_type=None, *args, **kwargs):
189
    """Creates a copy of the source object. If container is None, creates the
190
    copy inside the same container as the source. If portal_type is specified,
191
    creates a new object of this type, and copies the values from source fields
192
    to the destination object. Field values sent as kwargs have priority over
193
    the field values from source.
194
195
    :param source: object from which create a copy
196
    :type source: ATContentType/DexterityContentType/CatalogBrain
197
    :param container: destination container
198
    :type container: ATContentType/DexterityContentType/CatalogBrain
199
    :param portal_type: destination portal type
200
    :returns: The new created object
201
    """
202
    # Prevent circular dependencies
203
    from security import check_permission
204
    # Use same container as source unless explicitly set
205
    source = get_object(source)
206
    if not container:
207
        container = get_parent(source)
208
209
    # Use same portal type as source unless explicitly set
210
    if not portal_type:
211
        portal_type = get_portal_type(source)
212
213
    # Extend the fields to skip with defaults
214
    skip = kwargs.pop("skip", [])
215
    skip = set(skip)
216
    skip.update([
217
        "Products.Archetypes.Field.ComputedField",
218
        "UID",
219
        "id",
220
        "allowDiscussion",
221
        "contributors",
222
        "creation_date",
223
        "creators",
224
        "effectiveDate",
225
        "expirationDate",
226
        "language",
227
        "location",
228
        "modification_date",
229
        "rights",
230
        "subject",
231
    ])
232
    # Build a dict for complexity reduction
233
    skip = dict([(item, True) for item in skip])
234
235
    # Update kwargs with the field values to copy from source
236
    fields = get_fields(source)
237
    for field_name, field in fields.items():
238
        # Prioritize field values passed as kwargs
239
        if field_name in kwargs:
240
            continue
241
        # Skip framework internal fields by name
242
        if skip.get(field_name, False):
243
            continue
244
        # Skip fields of non-suitable types
245
        if hasattr(field, "getType") and skip.get(field.getType(), False):
246
            continue
247
        # Skip readonly fields
248
        if getattr(field, "readonly", False):
249
            continue
250
        # Skip non-readable fields
251
        perm = getattr(field, "read_permission", View)
252
        if perm and not check_permission(perm, source):
253
            continue
254
255
        # do not wake-up objects unnecessarily
256
        if hasattr(field, "getRaw"):
257
            field_value = field.getRaw(source)
258
        elif hasattr(field, "get_raw"):
259
            field_value = field.get_raw(source)
260
        elif hasattr(field, "getAccessor"):
261
            accessor = field.getAccessor(source)
262
            field_value = accessor()
263
        else:
264
            field_value = field.get(source)
265
266
        # Do a hard copy of value if mutable type
267
        if isinstance(field_value, (list, dict, set)):
268
            field_value = copy.deepcopy(field_value)
269
        kwargs.update({field_name: field_value})
270
271
    # Create a copy
272
    return create(container, portal_type, *args, **kwargs)
273
274
275
def edit(obj, check_permissions=True, **kwargs):
276
    """Updates the values of object fields with the new values passed-in
277
    """
278
    # Prevent circular dependencies
279
    from security import check_permission
280
    fields = get_fields(obj)
281
    for name, value in kwargs.items():
282
        field = fields.get(name, None)
283
        if not field:
284
            continue
285
286
        # cannot update readonly fields
287
        readonly = getattr(field, "readonly", False)
288
        if readonly:
289
            raise ValueError("Field '{}' is readonly".format(name))
290
291
        # check field writable permission
292
        if check_permissions:
293
            perm = getattr(field, "write_permission", ModifyPortalContent)
294
            if perm and not check_permission(perm, obj):
295
                raise Unauthorized("Field '{}' is not writeable".format(name))
296
297
        # Set the value
298
        if hasattr(field, "getMutator"):
299
            mutator = field.getMutator(obj)
300
            mapply(mutator, value)
301
        else:
302
            field.set(obj, value)
303
304
305
def get_tool(name, context=None, default=_marker):
306
    """Get a portal tool by name
307
308
    :param name: The name of the tool, e.g. `senaite_setup_catalog`
309
    :type name: string
310
    :param context: A portal object
311
    :type context: ATContentType/DexterityContentType/CatalogBrain
312
    :returns: Portal Tool
313
    """
314
315
    # Try first with the context
316
    if context is not None:
317
        try:
318
            context = get_object(context)
319
            return getToolByName(context, name)
320
        except (APIError, AttributeError) as e:
321
            # https://github.com/senaite/bika.lims/issues/396
322
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
323
                        "-> falling back to plone.api.portal.get_tool('{}')"
324
                        .format(repr(context), name, repr(e), name))
325
            return get_tool(name, default=default)
326
327
    # Try with the plone api
328
    try:
329
        return ploneapi.portal.get_tool(name)
330
    except InvalidParameterError:
331
        if default is not _marker:
332
            if isinstance(default, six.string_types):
333
                return get_tool(default)
334
            return default
335
        fail("No tool named '%s' found." % name)
336
337
338
def fail(msg=None):
339
    """API LIMS Error
340
    """
341
    if msg is None:
342
        msg = "Reason not given."
343
    raise APIError("{}".format(msg))
344
345
346
def is_object(brain_or_object):
347
    """Check if the passed in object is a supported portal content object
348
349
    :param brain_or_object: A single catalog brain or content object
350
    :type brain_or_object: Portal Object
351
    :returns: True if the passed in object is a valid portal content
352
    """
353
    if is_portal(brain_or_object):
354
        return True
355
    if is_supermodel(brain_or_object):
356
        return True
357
    if is_at_content(brain_or_object):
358
        return True
359
    if is_dexterity_content(brain_or_object):
360
        return True
361
    if is_brain(brain_or_object):
362
        return True
363
    return False
364
365
366
def get_object(brain_object_uid, default=_marker):
367
    """Get the full content object
368
369
    :param brain_object_uid: A catalog brain or content object or uid
370
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
371
    /CatalogBrain/basestring
372
    :returns: The full object
373
    """
374
    if is_uid(brain_object_uid):
375
        return get_object_by_uid(brain_object_uid)
376
    elif is_supermodel(brain_object_uid):
377
        return brain_object_uid.instance
378
    if not is_object(brain_object_uid):
379
        if default is _marker:
380
            fail("{} is not supported.".format(repr(brain_object_uid)))
381
        return default
382
    if is_brain(brain_object_uid):
383
        return brain_object_uid.getObject()
384
    return brain_object_uid
385
386
387
def is_portal(brain_or_object):
388
    """Checks if the passed in object is the portal root object
389
390
    :param brain_or_object: A single catalog brain or content object
391
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
392
    :returns: True if the object is the portal root object
393
    :rtype: bool
394
    """
395
    return ISiteRoot.providedBy(brain_or_object)
396
397
398
def is_brain(brain_or_object):
399
    """Checks if the passed in object is a portal catalog brain
400
401
    :param brain_or_object: A single catalog brain or content object
402
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
403
    :returns: True if the object is a catalog brain
404
    :rtype: bool
405
    """
406
    return ICatalogBrain.providedBy(brain_or_object)
407
408
409
def is_supermodel(brain_or_object):
410
    """Checks if the passed in object is a supermodel
411
412
    :param brain_or_object: A single catalog brain or content object
413
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
414
    :returns: True if the object is a catalog brain
415
    :rtype: bool
416
    """
417
    # avoid circular imports
418
    from senaite.app.supermodel.interfaces import ISuperModel
419
    return ISuperModel.providedBy(brain_or_object)
420
421
422
def is_dexterity_content(brain_or_object):
423
    """Checks if the passed in object is a dexterity content type
424
425
    :param brain_or_object: A single catalog brain or content object
426
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
427
    :returns: True if the object is a dexterity content type
428
    :rtype: bool
429
    """
430
    return IDexterityContent.providedBy(brain_or_object)
431
432
433
def is_at_content(brain_or_object):
434
    """Checks if the passed in object is an AT content type
435
436
    :param brain_or_object: A single catalog brain or content object
437
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
438
    :returns: True if the object is an AT content type
439
    :rtype: bool
440
    """
441
    return isinstance(brain_or_object, BaseObject)
442
443
444
def is_dx_type(portal_type):
445
    """Checks if the portal type is DX based
446
447
    :param portal_type: The portal type name to check
448
    :returns: True if the portal type is DX based
449
    """
450
    portal_types = get_tool("portal_types")
451
    fti = portal_types.getTypeInfo(portal_type)
452
    if fti.product:
453
        return False
454
    return True
455
456
457
def is_at_type(portal_type):
458
    """Checks if the portal type is AT based
459
460
    :param portal_type: The portal type name to check
461
    :returns: True if the portal type is AT based
462
    """
463
    return not is_dx_type(portal_type)
464
465
466
def is_folderish(brain_or_object):
467
    """Checks if the passed in object is folderish
468
469
    :param brain_or_object: A single catalog brain or content object
470
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
471
    :returns: True if the object is folderish
472
    :rtype: bool
473
    """
474
    if hasattr(brain_or_object, "is_folderish"):
475
        if callable(brain_or_object.is_folderish):
476
            return brain_or_object.is_folderish()
477
        return brain_or_object.is_folderish
478
    return IFolderish.providedBy(get_object(brain_or_object))
479
480
481
def get_portal_type(brain_or_object):
482
    """Get the portal type for this object
483
484
    :param brain_or_object: A single catalog brain or content object
485
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
486
    :returns: Portal type
487
    :rtype: string
488
    """
489
    if not is_object(brain_or_object):
490
        fail("{} is not supported.".format(repr(brain_or_object)))
491
    return brain_or_object.portal_type
492
493
494
def get_schema(brain_or_object):
495
    """Get the schema of the content
496
497
    :param brain_or_object: A single catalog brain or content object
498
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
499
    :returns: Schema object
500
    """
501
    obj = get_object(brain_or_object)
502
    if is_portal(obj):
503
        fail("get_schema can't return schema of portal root")
504
    if is_dexterity_content(obj):
505
        pt = get_tool("portal_types")
506
        fti = pt.getTypeInfo(obj.portal_type)
507
        return fti.lookupSchema()
508
    if is_at_content(obj):
509
        return obj.Schema()
510
    fail("{} has no Schema.".format(brain_or_object))
511
512
513
def get_behaviors(portal_type):
514
    """List all behaviors
515
516
    :param portal_type: DX portal type name
517
    """
518
    portal_types = get_tool("portal_types")
519
    fti = portal_types.getTypeInfo(portal_type)
520
    if fti.product:
521
        raise TypeError("Expected DX type, got AT type instead.")
522
    return fti.behaviors
523
524
525
def enable_behavior(portal_type, behavior_id):
526
    """Enable behavior
527
528
    :param portal_type: DX portal type name
529
    :param behavior_id: The behavior to enable
530
    """
531
    portal_types = get_tool("portal_types")
532
    fti = portal_types.getTypeInfo(portal_type)
533
    if fti.product:
534
        raise TypeError("Expected DX type, got AT type instead.")
535
536
    if behavior_id not in fti.behaviors:
537
        fti.behaviors += (behavior_id, )
538
        # invalidate schema cache
539
        notify(SchemaInvalidatedEvent(portal_type))
540
541
542
def disable_behavior(portal_type, behavior_id):
543
    """Disable behavior
544
545
    :param portal_type: DX portal type name
546
    :param behavior_id: The behavior to disable
547
    """
548
    portal_types = get_tool("portal_types")
549
    fti = portal_types.getTypeInfo(portal_type)
550
    if fti.product:
551
        raise TypeError("Expected DX type, got AT type instead.")
552
    fti.behaviors = tuple(filter(lambda b: b != behavior_id, fti.behaviors))
553
    # invalidate schema cache
554
    notify(SchemaInvalidatedEvent(portal_type))
555
556
557
def get_fields(brain_or_object):
558
    """Get a name to field mapping of the object
559
560
    :param brain_or_object: A single catalog brain or content object
561
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
562
    :returns: Mapping of name -> field
563
    :rtype: OrderedDict
564
    """
565
    obj = get_object(brain_or_object)
566
    schema = get_schema(obj)
567
    if is_dexterity_content(obj):
568
        # get the fields directly provided by the interface
569
        fields = getFieldsInOrder(schema)
570
        # append the fields coming from behaviors
571
        behavior_assignable = IBehaviorAssignable(obj)
572
        if behavior_assignable:
573
            behaviors = behavior_assignable.enumerateBehaviors()
574
            for behavior in behaviors:
575
                fields.extend(getFieldsInOrder(behavior.interface))
576
        return OrderedDict(fields)
577
    return OrderedDict(schema)
578
579
580
def get_id(brain_or_object):
581
    """Get the Plone ID for this object
582
583
    :param brain_or_object: A single catalog brain or content object
584
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
585
    :returns: Plone ID
586
    :rtype: string
587
    """
588
    if is_brain(brain_or_object):
589
        if base_hasattr(brain_or_object, "getId"):
590
            return brain_or_object.getId
591
        if base_hasattr(brain_or_object, "id"):
592
            return brain_or_object.id
593
    return get_object(brain_or_object).getId()
594
595
596
def get_title(brain_or_object):
597
    """Get the Title for this object
598
599
    :param brain_or_object: A single catalog brain or content object
600
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
601
    :returns: Title
602
    :rtype: string
603
    """
604
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
605
        return brain_or_object.Title
606
    return get_object(brain_or_object).Title()
607
608
609
def get_description(brain_or_object):
610
    """Get the Title for this object
611
612
    :param brain_or_object: A single catalog brain or content object
613
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
614
    :returns: Title
615
    :rtype: string
616
    """
617
    if is_brain(brain_or_object) \
618
            and base_hasattr(brain_or_object, "Description"):
619
        return brain_or_object.Description
620
    return get_object(brain_or_object).Description()
621
622
623
def get_uid(brain_or_object):
624
    """Get the Plone UID for this object
625
626
    :param brain_or_object: A single catalog brain or content object or an UID
627
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
628
    :returns: Plone UID
629
    :rtype: string
630
    """
631
    if is_uid(brain_or_object):
632
        return brain_or_object
633
    if is_portal(brain_or_object):
634
        return '0'
635
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
636
        return brain_or_object.UID
637
    return get_object(brain_or_object).UID()
638
639
640
def get_url(brain_or_object):
641
    """Get the absolute URL for this object
642
643
    :param brain_or_object: A single catalog brain or content object
644
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
645
    :returns: Absolute URL
646
    :rtype: string
647
    """
648
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
649
        return brain_or_object.getURL()
650
    return get_object(brain_or_object).absolute_url()
651
652
653
def get_icon(brain_or_object, html_tag=True):
654
    """Get the icon of the content object
655
656
    :param brain_or_object: A single catalog brain or content object
657
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
658
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
659
    :type html_tag: bool
660
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
661
    :rtype: string
662
    """
663
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
664
    # work for Contents coming from other catalogs than the
665
    # `portal_catalog`
666
    portal_types = get_tool("portal_types")
667
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
668
    icon = fti.getIcon()
669
    if not icon:
670
        return ""
671
    url = "%s/%s" % (get_url(get_portal()), icon)
672
    if not html_tag:
673
        return url
674
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
675
        url=url, title=get_title(brain_or_object))
676
    return tag
677
678
679
def get_object_by_uid(uid, default=_marker):
680
    """Find an object by a given UID
681
682
    :param uid: The UID of the object to find
683
    :type uid: string
684
    :returns: Found Object or None
685
    """
686
687
    # nothing to do here
688
    if not uid:
689
        if default is not _marker:
690
            return default
691
        fail("get_object_by_uid requires UID as first argument; got {} instead"
692
             .format(uid))
693
694
    # we defined the portal object UID to be '0'::
695
    if uid == '0':
696
        return get_portal()
697
698
    brain = get_brain_by_uid(uid)
699
700
    if brain is None:
701
        if default is not _marker:
702
            return default
703
        fail("No object found for UID {}".format(uid))
704
705
    return get_object(brain)
706
707
708
def get_brain_by_uid(uid, default=None):
709
    """Query a brain by a given UID
710
711
    :param uid: The UID of the object to find
712
    :type uid: string
713
    :returns: ZCatalog brain or None
714
    """
715
    if not is_uid(uid):
716
        return default
717
718
    # we try to find the object with the UID catalog
719
    uc = get_tool(UID_CATALOG)
720
721
    # try to find the object with the reference catalog first
722
    brains = uc(UID=uid)
723
    if len(brains) != 1:
724
        return default
725
    return brains[0]
726
727
728
def get_object_by_path(path, default=_marker):
729
    """Find an object by a given physical path or absolute_url
730
731
    :param path: The physical path of the object to find
732
    :type path: string
733
    :returns: Found Object or None
734
    """
735
736
    # nothing to do here
737
    if not path:
738
        if default is not _marker:
739
            return default
740
        fail("get_object_by_path first argument must be a path; {} received"
741
             .format(path))
742
743
    portal = get_portal()
744
    portal_path = get_path(portal)
745
    portal_url = get_url(portal)
746
747
    # ensure we have a physical path
748
    if path.startswith(portal_url):
749
        request = get_request()
750
        path = "/".join(request.physicalPathFromURL(path))
751
752
    if not path.startswith(portal_path):
753
        if default is not _marker:
754
            return default
755
        fail("Not a physical path inside the portal.")
756
757
    if path == portal_path:
758
        return portal
759
760
    return portal.restrictedTraverse(path, default)
761
762
763
def get_path(brain_or_object):
764
    """Calculate the physical path of this object
765
766
    :param brain_or_object: A single catalog brain or content object
767
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
768
    :returns: Physical path of the object
769
    :rtype: string
770
    """
771
    if is_brain(brain_or_object):
772
        return brain_or_object.getPath()
773
    return "/".join(get_object(brain_or_object).getPhysicalPath())
774
775
776
def get_parent_path(brain_or_object):
777
    """Calculate the physical parent path of this object
778
779
    :param brain_or_object: A single catalog brain or content object
780
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
781
    :returns: Physical path of the parent object
782
    :rtype: string
783
    """
784
    if is_portal(brain_or_object):
785
        return get_path(get_portal())
786
    if is_brain(brain_or_object):
787
        path = get_path(brain_or_object)
788
        return path.rpartition("/")[0]
789
    return get_path(get_object(brain_or_object).aq_parent)
790
791
792
def get_parent(brain_or_object, **kw):
793
    """Locate the parent object of the content/catalog brain
794
795
    :param brain_or_object: A single catalog brain or content object
796
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
797
    :param catalog_search: Use a catalog query to find the parent object
798
    :type catalog_search: bool
799
    :returns: parent object
800
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
801
    """
802
803
    if is_portal(brain_or_object):
804
        return get_portal()
805
806
    # BBB: removed `catalog_search` keyword
807
    if kw:
808
        logger.warn("API function `get_parent` no longer support keywords.")
809
810
    return get_object(brain_or_object).aq_parent
811
812
813
def search(query, catalog=_marker):
814
    """Search for objects.
815
816
    :param query: A suitable search query.
817
    :type query: dict
818
    :param catalog: A single catalog id or a list of catalog ids
819
    :type catalog: str/list
820
    :returns: Search results
821
    :rtype: List of ZCatalog brains
822
    """
823
824
    # query needs to be a dictionary
825
    if not isinstance(query, dict):
826
        fail("Catalog query needs to be a dictionary")
827
828
    # Portal types to query
829
    portal_types = query.get("portal_type", [])
830
    # We want the portal_type as a list
831
    if not isinstance(portal_types, (tuple, list)):
832
        portal_types = [portal_types]
833
834
    # The catalogs used for the query
835
    catalogs = []
836
837
    # The user did **not** specify a catalog
838
    if catalog is _marker:
839
        # Find the registered catalogs for the queried portal types
840
        for portal_type in portal_types:
841
            # Just get the first registered/default catalog
842
            catalogs.append(get_catalogs_for(
843
                portal_type, default=UID_CATALOG)[0])
844
    else:
845
        # User defined catalogs
846
        if isinstance(catalog, (list, tuple)):
847
            catalogs.extend(map(get_tool, catalog))
848
        else:
849
            catalogs.append(get_tool(catalog))
850
851
    # Cleanup: Avoid duplicate catalogs
852
    catalogs = list(set(catalogs)) or [get_uid_catalog()]
853
854
    # We only support **single** catalog queries
855
    if len(catalogs) > 1:
856
        fail("Multi Catalog Queries are not supported!")
857
858
    return catalogs[0](query)
859
860
861
def safe_getattr(brain_or_object, attr, default=_marker):
862
    """Return the attribute value
863
864
    :param brain_or_object: A single catalog brain or content object
865
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
866
    :param attr: Attribute name
867
    :type attr: str
868
    :returns: Attribute value
869
    :rtype: obj
870
    """
871
    try:
872
        value = getattr(brain_or_object, attr, _marker)
873
        if value is _marker:
874
            if default is not _marker:
875
                return default
876
            fail("Attribute '{}' not found.".format(attr))
877
        if callable(value):
878
            return value()
879
        return value
880
    except Unauthorized:
881
        if default is not _marker:
882
            return default
883
        fail("You are not authorized to access '{}' of '{}'.".format(
884
            attr, repr(brain_or_object)))
885
886
887
def get_uid_catalog():
888
    """Get the UID catalog tool
889
890
    :returns: UID Catalog Tool
891
    """
892
    return get_tool(UID_CATALOG)
893
894
895
def get_portal_catalog():
896
    """Get the portal catalog tool
897
898
    :returns: Portal Catalog Tool
899
    """
900
    return get_tool(PORTAL_CATALOG)
901
902
903
def get_review_history(brain_or_object, rev=True):
904
    """Get the review history for the given brain or context.
905
906
    :param brain_or_object: A single catalog brain or content object
907
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
908
    :returns: Workflow history
909
    :rtype: [{}, ...]
910
    """
911
    obj = get_object(brain_or_object)
912
    review_history = []
913
    try:
914
        workflow = get_tool("portal_workflow")
915
        review_history = workflow.getInfoFor(obj, 'review_history')
916
    except WorkflowException as e:
917
        message = str(e)
918
        logger.error("Cannot retrieve review_history on {}: {}".format(
919
            obj, message))
920
    if not isinstance(review_history, (list, tuple)):
921
        logger.error("get_review_history: expected list, received {}".format(
922
            review_history))
923
        review_history = []
924
925
    if isinstance(review_history, tuple):
926
        # Products.CMFDefault.DefaultWorkflow.getInfoFor always returns a
927
        # tuple when "review_history" is passed in:
928
        # https://github.com/zopefoundation/Products.CMFDefault/blob/master/Products/CMFDefault/DefaultWorkflow.py#L244
929
        #
930
        # On the other hand, Products.DCWorkflow.getInfoFor relies on
931
        # Expression.StateChangeInfo, that always returns a list, except when no
932
        # review_history is found:
933
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/DCWorkflow.py#L310
934
        # https://github.com/zopefoundation/Products.DCWorkflow/blob/master/Products/DCWorkflow/Expression.py#L94
935
        review_history = list(review_history)
936
937
    if rev is True:
938
        review_history.reverse()
939
    return review_history
940
941
942
def get_revision_history(brain_or_object):
943
    """Get the revision history for the given brain or context.
944
945
    :param brain_or_object: A single catalog brain or content object
946
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
947
    :returns: Workflow history
948
    :rtype: obj
949
    """
950
    obj = get_object(brain_or_object)
951
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
952
    return chv.fullHistory()
953
954
955
def get_workflows_for(brain_or_object):
956
    """Get the assigned workflows for the given brain or context.
957
958
    Note: This function supports also the portal_type as parameter.
959
960
    :param brain_or_object: A single catalog brain or content object
961
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
962
    :returns: Assigned Workflows
963
    :rtype: tuple
964
    """
965
    workflow = ploneapi.portal.get_tool("portal_workflow")
966
    if isinstance(brain_or_object, six.string_types):
967
        return workflow.getChainFor(brain_or_object)
968
    obj = get_object(brain_or_object)
969
    return workflow.getChainFor(obj)
970
971
972
def get_workflow_status_of(brain_or_object, state_var="review_state"):
973
    """Get the current workflow status of the given brain or context.
974
975
    :param brain_or_object: A single catalog brain or content object
976
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
977
    :param state_var: The name of the state variable
978
    :type state_var: string
979
    :returns: Status
980
    :rtype: str
981
    """
982
    # Try to get the state from the catalog brain first
983
    if is_brain(brain_or_object):
984
        if state_var in brain_or_object.schema():
985
            return brain_or_object[state_var]
986
987
    # Retrieve the sate from the object
988
    workflow = get_tool("portal_workflow")
989
    obj = get_object(brain_or_object)
990
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
991
992
993
def get_previous_worfklow_status_of(brain_or_object, skip=None, default=None):
994
    """Get the previous workflow status of the object
995
996
    :param brain_or_object: A single catalog brain or content object
997
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
998
    :param skip: Workflow states to skip
999
    :type skip: tuple/list
1000
    :returns: status
1001
    :rtype: str
1002
    """
1003
1004
    skip = isinstance(skip, (list, tuple)) and skip or []
1005
    history = get_review_history(brain_or_object)
1006
1007
    # Remove consecutive duplicates, some transitions might happen more than
1008
    # once consecutively (e.g. publish)
1009
    history = map(lambda i: i[0], groupby(history))
1010
1011
    for num, item in enumerate(history):
1012
        # skip the current history entry
1013
        if num == 0:
1014
            continue
1015
        status = item.get("review_state")
1016
        if status in skip:
1017
            continue
1018
        return status
1019
    return default
1020
1021
1022
def get_creation_date(brain_or_object):
1023
    """Get the creation date of the brain or object
1024
1025
    :param brain_or_object: A single catalog brain or content object
1026
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1027
    :returns: Creation date
1028
    :rtype: DateTime
1029
    """
1030
    created = getattr(brain_or_object, "created", None)
1031
    if created is None:
1032
        fail("Object {} has no creation date ".format(
1033
             repr(brain_or_object)))
1034
    if callable(created):
1035
        return created()
1036
    return created
1037
1038
1039
def get_modification_date(brain_or_object):
1040
    """Get the modification date of the brain or object
1041
1042
    :param brain_or_object: A single catalog brain or content object
1043
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1044
    :returns: Modification date
1045
    :rtype: DateTime
1046
    """
1047
    modified = getattr(brain_or_object, "modified", None)
1048
    if modified is None:
1049
        fail("Object {} has no modification date ".format(
1050
             repr(brain_or_object)))
1051
    if callable(modified):
1052
        return modified()
1053
    return modified
1054
1055
1056
def get_review_status(brain_or_object):
1057
    """Get the `review_state` of an object
1058
1059
    :param brain_or_object: A single catalog brain or content object
1060
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1061
    :returns: Value of the review_status variable
1062
    :rtype: String
1063
    """
1064
    if is_brain(brain_or_object) \
1065
       and base_hasattr(brain_or_object, "review_state"):
1066
        return brain_or_object.review_state
1067
    return get_workflow_status_of(brain_or_object, state_var="review_state")
1068
1069
1070
def is_active(brain_or_object):
1071
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
1072
1073
    :param brain_or_object: A single catalog brain or content object
1074
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1075
    :returns: False if the object is in the state 'inactive' or 'cancelled'
1076
    :rtype: bool
1077
    """
1078
    if get_review_status(brain_or_object) in ["cancelled", "inactive"]:
1079
        return False
1080
    return True
1081
1082
1083
def get_catalogs_for(brain_or_object, default=PORTAL_CATALOG):
1084
    """Get all registered catalogs for the given portal_type, catalog brain or
1085
    content object
1086
1087
    NOTE: We pass in the `portal_catalog` as default in subsequent calls to
1088
          work around the missing `uid_catalog` during snapshot creation when
1089
          installing a fresh site!
1090
1091
    :param brain_or_object: The portal_type, a catalog brain or content object
1092
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1093
    :returns: List of supported catalogs
1094
    :rtype: list
1095
    """
1096
    archetype_tool = get_tool("archetype_tool", default=None)
1097
    if archetype_tool is None:
1098
        # return the default catalog
1099
        return [get_tool(default, default=PORTAL_CATALOG)]
1100
1101
    catalogs = []
1102
1103
    # get the registered catalogs for portal_type
1104
    if is_object(brain_or_object):
1105
        catalogs = archetype_tool.getCatalogsByType(
1106
            get_portal_type(brain_or_object))
1107
    if isinstance(brain_or_object, six.string_types):
1108
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
1109
1110
    if not catalogs:
1111
        return [get_tool(default, default=PORTAL_CATALOG)]
1112
    return catalogs
1113
1114
1115
def get_transitions_for(brain_or_object):
1116
    """List available workflow transitions for all workflows
1117
1118
    :param brain_or_object: A single catalog brain or content object
1119
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1120
    :returns: All possible available and allowed transitions
1121
    :rtype: list[dict]
1122
    """
1123
    workflow = get_tool('portal_workflow')
1124
    transitions = []
1125
    instance = get_object(brain_or_object)
1126
    for wfid in get_workflows_for(brain_or_object):
1127
        wf = workflow[wfid]
1128
        tlist = wf.getTransitionsFor(instance)
1129
        transitions.extend([t for t in tlist if t not in transitions])
1130
    return transitions
1131
1132
1133
def do_transition_for(brain_or_object, transition):
1134
    """Performs a workflow transition for the passed in object.
1135
1136
    :param brain_or_object: A single catalog brain or content object
1137
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1138
    :returns: The object where the transtion was performed
1139
    """
1140
    if not isinstance(transition, six.string_types):
1141
        fail("Transition type needs to be string, got '%s'" % type(transition))
1142
    obj = get_object(brain_or_object)
1143
    try:
1144
        ploneapi.content.transition(obj, transition)
1145
    except ploneapi.exc.InvalidParameterError as e:
1146
        fail("Failed to perform transition '{}' on {}: {}".format(
1147
             transition, obj, str(e)))
1148
    return obj
1149
1150
1151
def get_roles_for_permission(permission, brain_or_object):
1152
    """Get a list of granted roles for the given permission on the object.
1153
1154
    :param brain_or_object: A single catalog brain or content object
1155
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1156
    :returns: Roles for the given Permission
1157
    :rtype: list
1158
    """
1159
    obj = get_object(brain_or_object)
1160
    allowed = set(rolesForPermissionOn(permission, obj))
1161
    return sorted(allowed)
1162
1163
1164
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
1165
    """Checks if the passed in object is versionable.
1166
1167
    :param brain_or_object: A single catalog brain or content object
1168
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1169
    :returns: True if the object is versionable
1170
    :rtype: bool
1171
    """
1172
    pr = get_tool("portal_repository")
1173
    obj = get_object(brain_or_object)
1174
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
1175
        and pr.isVersionable(obj)
1176
1177
1178
def get_version(brain_or_object):
1179
    """Get the version of the current object
1180
1181
    :param brain_or_object: A single catalog brain or content object
1182
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1183
    :returns: The current version of the object, or None if not available
1184
    :rtype: int or None
1185
    """
1186
    obj = get_object(brain_or_object)
1187
    if not is_versionable(obj):
1188
        return None
1189
    return getattr(aq_base(obj), "version_id", 0)
1190
1191
1192
def get_view(name, context=None, request=None, default=None):
1193
    """Get the view by name
1194
1195
    :param name: The name of the view
1196
    :type name: str
1197
    :param context: The context to query the view
1198
    :type context: ATContentType/DexterityContentType/CatalogBrain
1199
    :param request: The request to query the view
1200
    :type request: HTTPRequest object
1201
    :returns: HTTP Request
1202
    :rtype: Products.Five.metaclass View object
1203
    """
1204
    context = context or get_portal()
1205
    request = request or get_request() or None
1206
    view = queryMultiAdapter((get_object(context), request), name=name)
1207
    if view is None:
1208
        return default
1209
    return view
1210
1211
1212
def get_request():
1213
    """Get the global request object
1214
1215
    :returns: HTTP Request
1216
    :rtype: HTTPRequest object
1217
    """
1218
    return globalrequest.getRequest()
1219
1220
1221
def get_test_request():
1222
    """Get the TestRequest object
1223
    """
1224
    request = TestRequest()
1225
    directlyProvides(request, IAttributeAnnotatable)
1226
    return request
1227
1228
1229
def get_group(group_or_groupname):
1230
    """Return Plone Group
1231
1232
    :param group_or_groupname: Plone group or the name of the group
1233
    :type groupname:  GroupData/str
1234
    :returns: Plone GroupData
1235
    """
1236
    if not group_or_groupname:
1237
1238
        return None
1239
    if hasattr(group_or_groupname, "_getGroup"):
1240
        return group_or_groupname
1241
    gtool = get_tool("portal_groups")
1242
    return gtool.getGroupById(group_or_groupname)
1243
1244
1245
def get_user(user_or_username):
1246
    """Return Plone User
1247
1248
    :param user_or_username: Plone user or user id
1249
    :returns: Plone MemberData
1250
    """
1251
    user = None
1252
    if isinstance(user_or_username, MemberData):
1253
        user = user_or_username
1254
    if isinstance(user_or_username, six.string_types):
1255
        user = get_member_by_login_name(get_portal(), user_or_username, False)
1256
    return user
1257
1258
1259
def get_user_properties(user_or_username):
1260
    """Return User Properties
1261
1262
    :param user_or_username: Plone group identifier
1263
    :returns: Plone MemberData
1264
    """
1265
    user = get_user(user_or_username)
1266
    if user is None:
1267
        return {}
1268
    if not callable(user.getUser):
1269
        return {}
1270
    out = {}
1271
    plone_user = user.getUser()
1272
    for sheet in plone_user.listPropertysheets():
1273
        ps = plone_user.getPropertysheet(sheet)
1274
        out.update(dict(ps.propertyItems()))
1275
    return out
1276
1277
1278
def get_users_by_roles(roles=None):
1279
    """Search Plone users by their roles
1280
1281
    :param roles: Plone role name or list of roles
1282
    :type roles:  list/str
1283
    :returns: List of Plone users having the role(s)
1284
    """
1285
    if not isinstance(roles, (tuple, list)):
1286
        roles = [roles]
1287
    mtool = get_tool("portal_membership")
1288
    return mtool.searchForMembers(roles=roles)
1289
1290
1291
def get_current_user():
1292
    """Returns the current logged in user
1293
1294
    :returns: Current User
1295
    """
1296
    return ploneapi.user.get_current()
1297
1298
1299
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1300
    """Returns the associated contact of a Plone user
1301
1302
    If the user passed in has no contact associated, return None.
1303
    The `contact_types` parameter filter the portal types for the search.
1304
1305
    :param: Plone user
1306
    :contact_types: List with the contact portal types to search
1307
    :returns: Contact associated to the Plone user or None
1308
    """
1309
    if not user:
1310
        return None
1311
1312
    from senaite.core.catalog import CONTACT_CATALOG  # Avoid circular import
1313
    query = {"portal_type": contact_types, "getUsername": user.id}
1314
    brains = search(query, catalog=CONTACT_CATALOG)
1315
    if not brains:
1316
        return None
1317
1318
    if len(brains) > 1:
1319
        # Oops, the user has multiple contacts assigned, return None
1320
        contacts = map(lambda c: c.Title, brains)
1321
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1322
        err_msg = err_msg.format(user.id, ','.join(contacts))
1323
        logger.error(err_msg)
1324
        return None
1325
1326
    return get_object(brains[0])
1327
1328
1329
def get_user_client(user_or_contact):
1330
    """Returns the client of the contact of a Plone user
1331
1332
    If the user passed in has no contact or does not belong to any client,
1333
    returns None.
1334
1335
    :param: Plone user or contact
1336
    :returns: Client the contact of the Plone user belongs to
1337
    """
1338
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1339
        # Lab contacts cannot belong to a client
1340
        return None
1341
1342
    if not IContact.providedBy(user_or_contact):
1343
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1344
        if IContact.providedBy(contact):
1345
            return get_user_client(contact)
1346
        return None
1347
1348
    client = get_parent(user_or_contact)
1349
    if client and IClient.providedBy(client):
1350
        return client
1351
1352
    return None
1353
1354
1355
def get_current_client():
1356
    """Returns the current client the current logged in user belongs to, if any
1357
1358
    :returns: Client the current logged in user belongs to or None
1359
    """
1360
    return get_user_client(get_current_user())
1361
1362
1363
def get_cache_key(brain_or_object):
1364
    """Generate a cache key for a common brain or object
1365
1366
    :param brain_or_object: A single catalog brain or content object
1367
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1368
    :returns: Cache Key
1369
    :rtype: str
1370
    """
1371
    key = [
1372
        get_portal_type(brain_or_object),
1373
        get_id(brain_or_object),
1374
        get_uid(brain_or_object),
1375
        # handle different domains gracefully
1376
        get_url(brain_or_object),
1377
        # Return the microsecond since the epoch in GMT
1378
        get_modification_date(brain_or_object).micros(),
1379
    ]
1380
    return "-".join(map(lambda x: str(x), key))
1381
1382
1383
def bika_cache_key_decorator(method, self, brain_or_object):
1384
    """Bika cache key decorator usable for
1385
1386
    :param brain_or_object: A single catalog brain or content object
1387
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1388
    :returns: Cache Key
1389
    :rtype: str
1390
    """
1391
    if brain_or_object is None:
1392
        raise DontCache
1393
    return get_cache_key(brain_or_object)
1394
1395
1396
def normalize_id(string):
1397
    """Normalize the id
1398
1399
    :param string: A string to normalize
1400
    :type string: str
1401
    :returns: Normalized ID
1402
    :rtype: str
1403
    """
1404
    if not isinstance(string, six.string_types):
1405
        fail("Type of argument must be string, found '{}'"
1406
             .format(type(string)))
1407
    # get the id nomalizer utility
1408
    normalizer = getUtility(IIDNormalizer).normalize
1409
    return normalizer(string)
1410
1411
1412
def normalize_filename(string):
1413
    """Normalize the filename
1414
1415
    :param string: A string to normalize
1416
    :type string: str
1417
    :returns: Normalized ID
1418
    :rtype: str
1419
    """
1420
    if not isinstance(string, six.string_types):
1421
        fail("Type of argument must be string, found '{}'"
1422
             .format(type(string)))
1423
    # get the file nomalizer utility
1424
    normalizer = getUtility(IFileNameNormalizer).normalize
1425
    return normalizer(string)
1426
1427
1428
def is_uid(uid, validate=False):
1429
    """Checks if the passed in uid is a valid UID
1430
1431
    :param uid: The uid to check
1432
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1433
    True, also verifies if a brain exists for the uid passed in
1434
    :type uid: string
1435
    :return: True if a valid uid
1436
    :rtype: bool
1437
    """
1438
    if not isinstance(uid, six.string_types):
1439
        return False
1440
    if uid == '0':
1441
        return True
1442
    if len(uid) != 32:
1443
        return False
1444
    if not UID_RX.match(uid):
1445
        return False
1446
    if not validate:
1447
        return True
1448
1449
    # Check if a brain for this uid exists
1450
    uc = get_tool('uid_catalog')
1451
    brains = uc(UID=uid)
1452
    if brains:
1453
        assert (len(brains) == 1)
1454
    return len(brains) > 0
1455
1456
1457
def is_date(date):
1458
    """Checks if the passed in value is a valid Zope's DateTime
1459
1460
    :param date: The date to check
1461
    :type date: DateTime
1462
    :return: True if a valid date
1463
    :rtype: bool
1464
    """
1465
    if not date:
1466
        return False
1467
    return isinstance(date, (DateTime, datetime))
1468
1469
1470
def to_date(value, default=None):
1471
    """Tries to convert the passed in value to Zope's DateTime
1472
1473
    :param value: The value to be converted to a valid DateTime
1474
    :type value: str, DateTime or datetime
1475
    :return: The DateTime representation of the value passed in or default
1476
    """
1477
1478
    # cannot use bika.lims.deprecated (circular dependencies)
1479
    import warnings
1480
    warnings.simplefilter("always", DeprecationWarning)
1481
    warn = "Deprecated: use senaite.core.api.dtime.to_DT instead"
1482
    warnings.warn(warn, category=DeprecationWarning, stacklevel=2)
1483
    warnings.simplefilter("default", DeprecationWarning)
1484
1485
    # prevent circular dependencies
1486
    from senaite.core.api.dtime import to_DT
1487
    date = to_DT(value)
1488
    if not date:
1489
        return to_DT(default)
1490
    return date
1491
1492
1493
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1494
               round_to_int=True):
1495
    """Returns the computed total number of minutes
1496
    """
1497
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1498
        float(seconds)/60 + float(milliseconds)/1000/60
1499
    return int(round(total)) if round_to_int else total
1500
1501
1502
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1503
    """Returns a representation of time in a string in xd yh zm format
1504
    """
1505
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1506
                         seconds=seconds, milliseconds=milliseconds)
1507
    delta = timedelta(minutes=int(round(minutes)))
1508
    d = delta.days
1509
    h = delta.seconds // 3600
1510
    m = (delta.seconds // 60) % 60
1511
    m = m and "{}m ".format(str(m)) or ""
1512
    d = d and "{}d ".format(str(d)) or ""
1513
    if m and d:
1514
        h = "{}h ".format(str(h))
1515
    else:
1516
        h = h and "{}h ".format(str(h)) or ""
1517
    return "".join([d, h, m]).strip()
1518
1519
1520
def to_int(value, default=_marker):
1521
    """Tries to convert the value to int.
1522
    Truncates at the decimal point if the value is a float
1523
1524
    :param value: The value to be converted to an int
1525
    :return: The resulting int or default
1526
    """
1527
    if is_floatable(value):
1528
        value = to_float(value)
1529
    try:
1530
        return int(value)
1531
    except (TypeError, ValueError):
1532
        if default is None:
1533
            return default
1534
        if default is not _marker:
1535
            return to_int(default)
1536
        fail("Value %s cannot be converted to int" % repr(value))
1537
1538
1539
def is_floatable(value):
1540
    """Checks if the passed in value is a valid floatable number
1541
1542
    :param value: The value to be evaluated as a float number
1543
    :type value: str, float, int
1544
    :returns: True if is a valid float number
1545
    :rtype: bool"""
1546
    try:
1547
        float(value)
1548
        return True
1549
    except (TypeError, ValueError):
1550
        return False
1551
1552
1553
def to_float(value, default=_marker):
1554
    """Converts the passed in value to a float number
1555
1556
    :param value: The value to be converted to a floatable number
1557
    :type value: str, float, int
1558
    :returns: The float number representation of the passed in value
1559
    :rtype: float
1560
    """
1561
    if not is_floatable(value):
1562
        if default is not _marker:
1563
            return to_float(default)
1564
        fail("Value %s is not floatable" % repr(value))
1565
    return float(value)
1566
1567
1568
def float_to_string(value, default=_marker):
1569
    """Convert a float value to string without exponential notation
1570
1571
    This function preserves the whole fraction
1572
1573
    :param value: The float value to be converted to a string
1574
    :type value: str, float, int
1575
    :returns: String representation of the float w/o exponential notation
1576
    :rtype: str
1577
    """
1578
    if not is_floatable(value):
1579
        if default is not _marker:
1580
            return default
1581
        fail("Value %s is not floatable" % repr(value))
1582
1583
    # Leave floatable string values unchanged
1584
    if isinstance(value, six.string_types):
1585
        return value
1586
1587
    value = float(value)
1588
    str_value = str(value)
1589
1590
    if "." in str_value:
1591
        # might be something like 1.23e-26
1592
        front, back = str_value.split(".")
1593
    else:
1594
        # or 1e-07 for 0.0000001
1595
        back = str_value
1596
1597
    if "e-" in back:
1598
        fraction, zeros = back.split("e-")
1599
        # we want to cover the faction and the zeros
1600
        precision = len(fraction) + int(zeros)
1601
        template = "{:.%df}" % precision
1602
        str_value = template.format(value)
1603
    elif "e+" in back:
1604
        # positive numbers, e.g. 1e+16 don't need a fractional part
1605
        str_value = "{:.0f}".format(value)
1606
1607
    # cut off trailing zeros
1608
    if "." in str_value:
1609
        str_value = str_value.rstrip("0").rstrip(".")
1610
1611
    return str_value
1612
1613
1614
def to_searchable_text_metadata(value):
1615
    """Parse the given metadata value to searchable text
1616
1617
    :param value: The raw value of the metadata column
1618
    :returns: Searchable and translated unicode value or None
1619
    """
1620
    if not value:
1621
        return u""
1622
    if value is Missing.Value:
1623
        return u""
1624
    if is_uid(value):
1625
        return u""
1626
    if isinstance(value, (bool)):
1627
        return u""
1628
    if isinstance(value, (list, tuple)):
1629
        values = map(to_searchable_text_metadata, value)
1630
        values = filter(None, values)
1631
        return " ".join(values)
1632
    if isinstance(value, dict):
1633
        return to_searchable_text_metadata(value.values())
1634
    if is_date(value):
1635
        from senaite.core.api.dtime import date_to_string
1636
        return date_to_string(value, "%Y-%m-%d")
1637
    if is_at_content(value):
1638
        return to_searchable_text_metadata(get_title(value))
1639
    if not isinstance(value, six.string_types):
1640
        value = str(value)
1641
    return safe_unicode(value)
1642
1643
1644
def get_registry_record(name, default=None):
1645
    """Returns the value of a registry record
1646
1647
    :param name: [required] name of the registry record
1648
    :type name: str
1649
    :param default: The value returned if the record is not found
1650
    :type default: anything
1651
    :returns: value of the registry record
1652
    """
1653
    return ploneapi.portal.get_registry_record(name, default=default)
1654
1655
1656
def to_display_list(pairs, sort_by="key", allow_empty=True):
1657
    """Create a Plone DisplayList from list items
1658
1659
    :param pairs: list of key, value pairs
1660
    :param sort_by: Sort the items either by key or value
1661
    :param allow_empty: Allow to select an empty value
1662
    :returns: Plone DisplayList
1663
    """
1664
    dl = DisplayList()
1665
1666
    if isinstance(pairs, six.string_types):
1667
        pairs = [pairs, pairs]
1668
    for pair in pairs:
1669
        # pairs is a list of lists -> add each pair
1670
        if isinstance(pair, (tuple, list)):
1671
            dl.add(*pair)
1672
        # pairs is just a single pair -> add it and stop
1673
        if isinstance(pair, six.string_types):
1674
            dl.add(*pairs)
1675
            break
1676
1677
    # add the empty option
1678
    if allow_empty:
1679
        dl.add("", "")
1680
1681
    # sort by key/value
1682
    if sort_by == "key":
1683
        dl = dl.sortedByKey()
1684
    elif sort_by == "value":
1685
        dl = dl.sortedByValue()
1686
1687
    return dl
1688
1689
1690
def text_to_html(text, wrap="p", encoding="utf8"):
1691
    """Convert `\n` sequences in the text to HTML `\n`
1692
1693
    :param text: Plain text to convert
1694
    :param wrap: Toggle to wrap the text in a
1695
    :returns: HTML converted and encoded text
1696
    """
1697
    if not text:
1698
        return ""
1699
    # handle text internally as unicode
1700
    text = safe_unicode(text)
1701
    # replace newline characters with HTML entities
1702
    html = text.replace("\n", "<br/>")
1703
    if wrap:
1704
        html = u"<{tag}>{html}</{tag}>".format(
1705
            tag=wrap, html=html)
1706
    # return encoded html
1707
    return html.encode(encoding)
1708
1709
1710
def to_utf8(string, default=_marker):
1711
    """Encode string to UTF8
1712
1713
    :param string: String to be encoded to UTF8
1714
    :returns: UTF8 encoded string
1715
    """
1716
    if not isinstance(string, six.string_types):
1717
        if default is _marker:
1718
            fail("Expected string type, got '%s'" % type(string))
1719
        return default
1720
    return safe_unicode(string).encode("utf8")
1721
1722
1723
def is_temporary(obj):
1724
    """Returns whether the given object is temporary or not
1725
1726
    :param obj: the object to evaluate
1727
    :returns: True if the object is temporary
1728
    """
1729
    if ITemporaryObject.providedBy(obj):
1730
        return True
1731
1732
    obj_id = getattr(aq_base(obj), "id", None)
1733
    if obj_id is None or UID_RX.match(obj_id):
1734
        return True
1735
1736
    parent = aq_parent(aq_inner(obj))
1737
    if not parent:
1738
        return True
1739
1740
    parent_id = getattr(aq_base(parent), "id", None)
1741
    if parent_id is None or UID_RX.match(parent_id):
1742
        return True
1743
1744
    # Checks to see if we are created inside the portal_factory.
1745
    # This might also happen for DX types in senaite.databox!
1746
    meta_type = getattr(aq_base(parent), "meta_type", "")
1747
    if meta_type == "TempFolder":
1748
        return True
1749
1750
    return False
1751
1752
1753
def mark_temporary(brain_or_object):
1754
    """Mark the object as temporary
1755
    """
1756
    obj = get_object(brain_or_object)
1757
    alsoProvides(obj, ITemporaryObject)
1758
1759
1760
def unmark_temporary(brain_or_object):
1761
    """Unmark the object as temporary
1762
    """
1763
    obj = get_object(brain_or_object)
1764
    noLongerProvides(obj, ITemporaryObject)
1765
1766
1767
def is_string(thing):
1768
    """Checks if the passed in object is a string type
1769
1770
    :param thing: object to test
1771
    :returns: True if the object is a string
1772
    """
1773
    return isinstance(thing, six.string_types)
1774
1775
1776
def parse_json(thing, default=""):
1777
    """Parse from JSON
1778
1779
    :param thing: thing to parse
1780
    :param default: value to return if cannot parse
1781
    :returns: the object representing the JSON or default
1782
    """
1783
    try:
1784
        return json.loads(thing)
1785
    except (TypeError, ValueError):
1786
        return default
1787
1788
1789
def to_list(value):
1790
    """Converts the value to a list
1791
1792
    :param value: the value to be represented as a list
1793
    :returns: a list that represents or contains the value
1794
    """
1795
    if is_string(value):
1796
        val = parse_json(value)
1797
        if isinstance(val, (list, tuple, set)):
1798
            value = val
1799
    if not isinstance(value, (list, tuple, set)):
1800
        value = [value]
1801
    return list(value)
1802