Passed
Push — master ( aa1c7b...33392c )
by Jordi
04:39
created

bika.lims.api.search()   B

Complexity

Conditions 7

Size

Total Lines 46
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 46
rs 8
c 0
b 0
f 0
cc 7
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
import re
9
from datetime import datetime
10
from datetime import timedelta
11
12
import Missing
13
from AccessControl.PermissionRole import rolesForPermissionOn
14
from Acquisition import aq_base
15
from bika.lims import logger
16
from bika.lims.interfaces import IClient
17
from bika.lims.interfaces import IContact
18
from bika.lims.interfaces import ILabContact
19
from DateTime import DateTime
20
from DateTime.interfaces import DateTimeError
21
from plone import api as ploneapi
22
from plone.api.exc import InvalidParameterError
23
from plone.app.layout.viewlets.content import ContentHistoryView
24
from plone.dexterity.interfaces import IDexterityContent
25
from plone.i18n.normalizer.interfaces import IFileNameNormalizer
26
from plone.i18n.normalizer.interfaces import IIDNormalizer
27
from plone.memoize.volatile import DontCache
28
from Products.Archetypes.atapi import DisplayList
29
from Products.Archetypes.BaseObject import BaseObject
30
from Products.CMFCore.interfaces import IFolderish
31
from Products.CMFCore.interfaces import ISiteRoot
32
from Products.CMFCore.utils import getToolByName
33
from Products.CMFCore.WorkflowCore import WorkflowException
34
from Products.CMFPlone.utils import _createObjectByType
35
from Products.CMFPlone.utils import base_hasattr
36
from Products.CMFPlone.utils import safe_unicode
37
from Products.ZCatalog.interfaces import ICatalogBrain
38
from zope import globalrequest
39
from zope.component import getMultiAdapter
40
from zope.component import getUtility
41
from zope.component.interfaces import IFactory
42
from zope.event import notify
43
from zope.lifecycleevent import ObjectCreatedEvent
44
from zope.lifecycleevent import modified
45
from zope.security.interfaces import Unauthorized
46
47
"""SENAITE LIMS Framework API
48
49
Please see bika.lims/docs/API.rst for documentation.
50
51
Architecural Notes:
52
53
Please add only functions that do a single thing for a single object.
54
55
Good: `def get_foo(brain_or_object)`
56
Bad:  `def get_foos(list_of_brain_objects)`
57
58
Why?
59
60
Because it makes things more complex. You can always use a pattern like this to
61
achieve the same::
62
63
    >>> foos = map(get_foo, list_of_brain_objects)
64
65
Please add for all of your functions a descriptive test in docs/API.rst.
66
67
Thanks.
68
"""
69
70
_marker = object()
71
72
UID_RX = re.compile("[a-z0-9]{32}$")
73
74
75
class APIError(Exception):
76
    """Base exception class for bika.lims errors."""
77
78
79
def get_portal():
80
    """Get the portal object
81
82
    :returns: Portal object
83
    """
84
    return ploneapi.portal.getSite()
85
86
87
def get_setup():
88
    """Fetch the `bika_setup` folder.
89
    """
90
    portal = get_portal()
91
    return portal.get("bika_setup")
92
93
94
def get_bika_setup():
95
    """Fetch the `bika_setup` folder.
96
    """
97
    return get_setup()
98
99
100
def create(container, portal_type, *args, **kwargs):
101
    """Creates an object in Bika LIMS
102
103
    This code uses most of the parts from the TypesTool
104
    see: `Products.CMFCore.TypesTool._constructInstance`
105
106
    :param container: container
107
    :type container: ATContentType/DexterityContentType/CatalogBrain
108
    :param portal_type: The portal type to create, e.g. "Client"
109
    :type portal_type: string
110
    :param title: The title for the new content object
111
    :type title: string
112
    :returns: The new created object
113
    """
114
    from bika.lims.utils import tmpID
115
    if kwargs.get("title") is None:
116
        kwargs["title"] = "New {}".format(portal_type)
117
118
    # generate a temporary ID
119
    tmp_id = tmpID()
120
121
    # get the fti
122
    types_tool = get_tool("portal_types")
123
    fti = types_tool.getTypeInfo(portal_type)
124
125
    if fti.product:
126
        obj = _createObjectByType(portal_type, container, tmp_id)
127
    else:
128
        # newstyle factory
129
        factory = getUtility(IFactory, fti.factory)
130
        obj = factory(tmp_id, *args, **kwargs)
131
        if hasattr(obj, '_setPortalTypeName'):
132
            obj._setPortalTypeName(fti.getId())
133
        notify(ObjectCreatedEvent(obj))
134
        # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and
135
        # ContainerModifiedEvent
136
        container._setObject(tmp_id, obj)
137
        # we get the object here with the current object id, as it might be
138
        # renamed already by an event handler
139
        obj = container._getOb(obj.getId())
140
141
    # handle AT Content
142
    if is_at_content(obj):
143
        obj.processForm()
144
145
    # Edit after processForm; processForm does AT unmarkCreationFlag.
146
    obj.edit(**kwargs)
147
148
    # explicit notification
149
    modified(obj)
150
    return obj
151
152
153
def get_tool(name, context=None, default=_marker):
154
    """Get a portal tool by name
155
156
    :param name: The name of the tool, e.g. `portal_catalog`
157
    :type name: string
158
    :param context: A portal object
159
    :type context: ATContentType/DexterityContentType/CatalogBrain
160
    :returns: Portal Tool
161
    """
162
163
    # Try first with the context
164
    if context is not None:
165
        try:
166
            context = get_object(context)
167
            return getToolByName(context, name)
168
        except (APIError, AttributeError) as e:
169
            # https://github.com/senaite/bika.lims/issues/396
170
            logger.warn("get_tool::getToolByName({}, '{}') failed: {} "
171
                        "-> falling back to plone.api.portal.get_tool('{}')"
172
                        .format(repr(context), name, repr(e), name))
173
            return get_tool(name, default=default)
174
175
    # Try with the plone api
176
    try:
177
        return ploneapi.portal.get_tool(name)
178
    except InvalidParameterError:
179
        if default is not _marker:
180
            return default
181
        fail("No tool named '%s' found." % name)
182
183
184
def fail(msg=None):
185
    """API LIMS Error
186
    """
187
    if msg is None:
188
        msg = "Reason not given."
189
    raise APIError("{}".format(msg))
190
191
192
def is_object(brain_or_object):
193
    """Check if the passed in object is a supported portal content object
194
195
    :param brain_or_object: A single catalog brain or content object
196
    :type brain_or_object: Portal Object
197
    :returns: True if the passed in object is a valid portal content
198
    """
199
    if is_portal(brain_or_object):
200
        return True
201
    if is_at_content(brain_or_object):
202
        return True
203
    if is_dexterity_content(brain_or_object):
204
        return True
205
    if is_brain(brain_or_object):
206
        return True
207
    return False
208
209
210
def get_object(brain_object_uid, default=_marker):
211
    """Get the full content object
212
213
    :param brain_object_uid: A catalog brain or content object or uid
214
    :type brain_object_uid: PortalObject/ATContentType/DexterityContentType
215
    /CatalogBrain/basestring
216
    :returns: The full object
217
    """
218
    if is_uid(brain_object_uid):
219
        return get_object_by_uid(brain_object_uid)
220
    if not is_object(brain_object_uid):
221
        if default is _marker:
222
            fail("{} is not supported.".format(repr(brain_object_uid)))
223
        return default
224
    if is_brain(brain_object_uid):
225
        return brain_object_uid.getObject()
226
    return brain_object_uid
227
228
229
def is_portal(brain_or_object):
230
    """Checks if the passed in object is the portal root object
231
232
    :param brain_or_object: A single catalog brain or content object
233
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
234
    :returns: True if the object is the portal root object
235
    :rtype: bool
236
    """
237
    return ISiteRoot.providedBy(brain_or_object)
238
239
240
def is_brain(brain_or_object):
241
    """Checks if the passed in object is a portal catalog brain
242
243
    :param brain_or_object: A single catalog brain or content object
244
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
245
    :returns: True if the object is a catalog brain
246
    :rtype: bool
247
    """
248
    return ICatalogBrain.providedBy(brain_or_object)
249
250
251
def is_dexterity_content(brain_or_object):
252
    """Checks if the passed in object is a dexterity content type
253
254
    :param brain_or_object: A single catalog brain or content object
255
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
256
    :returns: True if the object is a dexterity content type
257
    :rtype: bool
258
    """
259
    return IDexterityContent.providedBy(brain_or_object)
260
261
262
def is_at_content(brain_or_object):
263
    """Checks if the passed in object is an AT content type
264
265
    :param brain_or_object: A single catalog brain or content object
266
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
267
    :returns: True if the object is an AT content type
268
    :rtype: bool
269
    """
270
    return isinstance(brain_or_object, BaseObject)
271
272
273
def is_folderish(brain_or_object):
274
    """Checks if the passed in object is folderish
275
276
    :param brain_or_object: A single catalog brain or content object
277
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
278
    :returns: True if the object is folderish
279
    :rtype: bool
280
    """
281
    if hasattr(brain_or_object, "is_folderish"):
282
        if callable(brain_or_object.is_folderish):
283
            return brain_or_object.is_folderish()
284
        return brain_or_object.is_folderish
285
    return IFolderish.providedBy(get_object(brain_or_object))
286
287
288
def get_portal_type(brain_or_object):
289
    """Get the portal type for this object
290
291
    :param brain_or_object: A single catalog brain or content object
292
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
293
    :returns: Portal type
294
    :rtype: string
295
    """
296
    if not is_object(brain_or_object):
297
        fail("{} is not supported.".format(repr(brain_or_object)))
298
    return brain_or_object.portal_type
299
300
301
def get_schema(brain_or_object):
302
    """Get the schema of the content
303
304
    :param brain_or_object: A single catalog brain or content object
305
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
306
    :returns: Schema object
307
    """
308
    obj = get_object(brain_or_object)
309
    if is_portal(obj):
310
        fail("get_schema can't return schema of portal root")
311
    if is_dexterity_content(obj):
312
        pt = get_tool("portal_types")
313
        fti = pt.getTypeInfo(obj.portal_type)
314
        return fti.lookupSchema()
315
    if is_at_content(obj):
316
        return obj.Schema()
317
    fail("{} has no Schema.".format(brain_or_object))
318
319
320
def get_fields(brain_or_object):
321
    """Get the list of fields from the object
322
323
    :param brain_or_object: A single catalog brain or content object
324
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
325
    :returns: List of fields
326
    :rtype: list
327
    """
328
    obj = get_object(brain_or_object)
329
    schema = get_schema(obj)
330
    if is_dexterity_content(obj):
331
        names = schema.names()
332
        fields = map(lambda name: schema.get(name), names)
333
        return dict(zip(names, fields))
334
    return dict(zip(schema.keys(), schema.fields()))
335
336
337
def get_id(brain_or_object):
338
    """Get the Plone ID for this object
339
340
    :param brain_or_object: A single catalog brain or content object
341
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
342
    :returns: Plone ID
343
    :rtype: string
344
    """
345
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getId"):
346
        return brain_or_object.getId
347
    return get_object(brain_or_object).getId()
348
349
350
def get_title(brain_or_object):
351
    """Get the Title for this object
352
353
    :param brain_or_object: A single catalog brain or content object
354
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
355
    :returns: Title
356
    :rtype: string
357
    """
358
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):
359
        return brain_or_object.Title
360
    return get_object(brain_or_object).Title()
361
362
363
def get_description(brain_or_object):
364
    """Get the Title for this object
365
366
    :param brain_or_object: A single catalog brain or content object
367
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
368
    :returns: Title
369
    :rtype: string
370
    """
371
    if is_brain(brain_or_object) \
372
            and base_hasattr(brain_or_object, "Description"):
373
        return brain_or_object.Description
374
    return get_object(brain_or_object).Description()
375
376
377
def get_uid(brain_or_object):
378
    """Get the Plone UID for this object
379
380
    :param brain_or_object: A single catalog brain or content object or an UID
381
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
382
    :returns: Plone UID
383
    :rtype: string
384
    """
385
    if is_uid(brain_or_object):
386
        return brain_or_object
387
    if is_portal(brain_or_object):
388
        return '0'
389
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):
390
        return brain_or_object.UID
391
    return get_object(brain_or_object).UID()
392
393
394
def get_url(brain_or_object):
395
    """Get the absolute URL for this object
396
397
    :param brain_or_object: A single catalog brain or content object
398
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
399
    :returns: Absolute URL
400
    :rtype: string
401
    """
402
    if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):
403
        return brain_or_object.getURL()
404
    return get_object(brain_or_object).absolute_url()
405
406
407
def get_icon(brain_or_object, html_tag=True):
408
    """Get the icon of the content object
409
410
    :param brain_or_object: A single catalog brain or content object
411
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
412
    :param html_tag: A value of 'True' returns the HTML tag, else the image url
413
    :type html_tag: bool
414
    :returns: HTML '<img>' tag if 'html_tag' is True else the image url
415
    :rtype: string
416
    """
417
    # Manual approach, because `plone.app.layout.getIcon` does not reliable
418
    # work for Contents coming from other catalogs than the
419
    # `portal_catalog`
420
    portal_types = get_tool("portal_types")
421
    fti = portal_types.getTypeInfo(brain_or_object.portal_type)
422
    icon = fti.getIcon()
423
    if not icon:
424
        return ""
425
    url = "%s/%s" % (get_url(get_portal()), icon)
426
    if not html_tag:
427
        return url
428
    tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format(
429
        url=url, title=get_title(brain_or_object))
430
    return tag
431
432
433
def get_object_by_uid(uid, default=_marker):
434
    """Find an object by a given UID
435
436
    :param uid: The UID of the object to find
437
    :type uid: string
438
    :returns: Found Object or None
439
    """
440
441
    # nothing to do here
442
    if not uid:
443
        if default is not _marker:
444
            return default
445
        fail("get_object_by_uid requires UID as first argument; got {} instead"
446
             .format(uid))
447
448
    # we defined the portal object UID to be '0'::
449
    if uid == '0':
450
        return get_portal()
451
452
    brain = get_brain_by_uid(uid)
453
454
    if brain is None:
455
        if default is not _marker:
456
            return default
457
        fail("No object found for UID {}".format(uid))
458
459
    return get_object(brain)
460
461
462
def get_brain_by_uid(uid, default=None):
463
    """Query a brain by a given UID
464
465
    :param uid: The UID of the object to find
466
    :type uid: string
467
    :returns: ZCatalog brain or None
468
    """
469
    if not is_uid(uid):
470
        return default
471
472
    # we try to find the object with the UID catalog
473
    uc = get_tool("uid_catalog")
474
475
    # try to find the object with the reference catalog first
476
    brains = uc(UID=uid)
477
    if len(brains) != 1:
478
        return default
479
    return brains[0]
480
481
482
def get_object_by_path(path, default=_marker):
483
    """Find an object by a given physical path or absolute_url
484
485
    :param path: The physical path of the object to find
486
    :type path: string
487
    :returns: Found Object or None
488
    """
489
490
    # nothing to do here
491
    if not path:
492
        if default is not _marker:
493
            return default
494
        fail("get_object_by_path first argument must be a path; {} received"
495
             .format(path))
496
497
    portal = get_portal()
498
    portal_path = get_path(portal)
499
    portal_url = get_url(portal)
500
501
    # ensure we have a physical path
502
    if path.startswith(portal_url):
503
        request = get_request()
504
        path = "/".join(request.physicalPathFromURL(path))
505
506
    if not path.startswith(portal_path):
507
        if default is not _marker:
508
            return default
509
        fail("Not a physical path inside the portal.")
510
511
    if path == portal_path:
512
        return portal
513
514
    return portal.restrictedTraverse(path, default)
515
516
517
def get_path(brain_or_object):
518
    """Calculate the physical path of this object
519
520
    :param brain_or_object: A single catalog brain or content object
521
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
522
    :returns: Physical path of the object
523
    :rtype: string
524
    """
525
    if is_brain(brain_or_object):
526
        return brain_or_object.getPath()
527
    return "/".join(get_object(brain_or_object).getPhysicalPath())
528
529
530
def get_parent_path(brain_or_object):
531
    """Calculate the physical parent path of this object
532
533
    :param brain_or_object: A single catalog brain or content object
534
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
535
    :returns: Physical path of the parent object
536
    :rtype: string
537
    """
538
    if is_portal(brain_or_object):
539
        return get_path(get_portal())
540
    if is_brain(brain_or_object):
541
        path = get_path(brain_or_object)
542
        return path.rpartition("/")[0]
543
    return get_path(get_object(brain_or_object).aq_parent)
544
545
546
def get_parent(brain_or_object, catalog_search=False):
547
    """Locate the parent object of the content/catalog brain
548
549
    The `catalog_search` switch uses the `portal_catalog` to do a search return
550
    a brain instead of the full parent object. However, if the search returned
551
    no results, it falls back to return the full parent object.
552
553
    :param brain_or_object: A single catalog brain or content object
554
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
555
    :param catalog_search: Use a catalog query to find the parent object
556
    :type catalog_search: bool
557
    :returns: parent object
558
    :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain
559
    """
560
561
    if is_portal(brain_or_object):
562
        return get_portal()
563
564
    # Do a catalog search and return the brain
565
    if catalog_search:
566
        parent_path = get_parent_path(brain_or_object)
567
568
        # parent is the portal object
569
        if parent_path == get_path(get_portal()):
570
            return get_portal()
571
572
        # get the catalog tool
573
        pc = get_portal_catalog()
574
575
        # query for the parent path
576
        results = pc(path={
577
            "query": parent_path,
578
            "depth": 0})
579
580
        # No results fallback: return the parent object
581
        if not results:
582
            return get_object(brain_or_object).aq_parent
583
584
        # return the brain
585
        return results[0]
586
587
    return get_object(brain_or_object).aq_parent
588
589
590
def search(query, catalog=_marker):
591
    """Search for objects.
592
593
    :param query: A suitable search query.
594
    :type query: dict
595
    :param catalog: A single catalog id or a list of catalog ids
596
    :type catalog: str/list
597
    :returns: Search results
598
    :rtype: List of ZCatalog brains
599
    """
600
601
    # query needs to be a dictionary
602
    if not isinstance(query, dict):
603
        fail("Catalog query needs to be a dictionary")
604
605
    # Portal types to query
606
    portal_types = query.get("portal_type", [])
607
    # We want the portal_type as a list
608
    if not isinstance(portal_types, (tuple, list)):
609
        portal_types = [portal_types]
610
611
    # The catalogs used for the query
612
    catalogs = []
613
614
    # The user did **not** specify a catalog
615
    if catalog is _marker:
616
        # Find the registered catalogs for the queried portal types
617
        for portal_type in portal_types:
618
            # Just get the first registered/default catalog
619
            catalogs.append(get_catalogs_for(
620
                portal_type, default="portal_catalog")[0])
621
    else:
622
        # User defined catalogs
623
        if isinstance(catalog, (list, tuple)):
624
            catalogs.extend(map(get_tool, catalog))
625
        else:
626
            catalogs.append(get_tool(catalog))
627
628
    # Cleanup: Avoid duplicate catalogs
629
    catalogs = list(set(catalogs)) or [get_portal_catalog()]
630
631
    # We only support **single** catalog queries
632
    if len(catalogs) > 1:
633
        fail("Multi Catalog Queries are not supported!")
634
635
    return catalogs[0](query)
636
637
638
def safe_getattr(brain_or_object, attr, default=_marker):
639
    """Return the attribute value
640
641
    :param brain_or_object: A single catalog brain or content object
642
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
643
    :param attr: Attribute name
644
    :type attr: str
645
    :returns: Attribute value
646
    :rtype: obj
647
    """
648
    try:
649
        value = getattr(brain_or_object, attr, _marker)
650
        if value is _marker:
651
            if default is not _marker:
652
                return default
653
            fail("Attribute '{}' not found.".format(attr))
654
        if callable(value):
655
            return value()
656
        return value
657
    except Unauthorized:
658
        if default is not _marker:
659
            return default
660
        fail("You are not authorized to access '{}' of '{}'.".format(
661
            attr, repr(brain_or_object)))
662
663
664
def get_portal_catalog():
665
    """Get the portal catalog tool
666
667
    :returns: Portal Catalog Tool
668
    """
669
    return get_tool("portal_catalog")
670
671
672
def get_review_history(brain_or_object, rev=True):
673
    """Get the review history for the given brain or context.
674
675
    :param brain_or_object: A single catalog brain or content object
676
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
677
    :returns: Workflow history
678
    :rtype: [{}, ...]
679
    """
680
    obj = get_object(brain_or_object)
681
    review_history = []
682
    try:
683
        workflow = get_tool("portal_workflow")
684
        review_history = workflow.getInfoFor(obj, 'review_history')
685
    except WorkflowException as e:
686
        message = str(e)
687
        logger.error("Cannot retrieve review_history on {}: {}".format(
688
            obj, message))
689
    if not isinstance(review_history, (list, tuple)):
690
        logger.error("get_review_history: expected list, recieved {}".format(
691
            review_history))
692
        review_history = []
693
    if rev is True:
694
        review_history.reverse()
695
    return review_history
696
697
698
def get_revision_history(brain_or_object):
699
    """Get the revision history for the given brain or context.
700
701
    :param brain_or_object: A single catalog brain or content object
702
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
703
    :returns: Workflow history
704
    :rtype: obj
705
    """
706
    obj = get_object(brain_or_object)
707
    chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))
708
    return chv.fullHistory()
709
710
711
def get_workflows_for(brain_or_object):
712
    """Get the assigned workflows for the given brain or context.
713
714
    Note: This function supports also the portal_type as parameter.
715
716
    :param brain_or_object: A single catalog brain or content object
717
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
718
    :returns: Assigned Workflows
719
    :rtype: tuple
720
    """
721
    workflow = ploneapi.portal.get_tool("portal_workflow")
722
    if isinstance(brain_or_object, basestring):
723
        return workflow.getChainFor(brain_or_object)
724
    obj = get_object(brain_or_object)
725
    return workflow.getChainFor(obj)
726
727
728
def get_workflow_status_of(brain_or_object, state_var="review_state"):
729
    """Get the current workflow status of the given brain or context.
730
731
    :param brain_or_object: A single catalog brain or content object
732
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
733
    :param state_var: The name of the state variable
734
    :type state_var: string
735
    :returns: Status
736
    :rtype: str
737
    """
738
    # Try to get the state from the catalog brain first
739
    if is_brain(brain_or_object):
740
        if state_var in brain_or_object.schema():
741
            return brain_or_object[state_var]
742
743
    # Retrieve the sate from the object
744
    workflow = get_tool("portal_workflow")
745
    obj = get_object(brain_or_object)
746
    return workflow.getInfoFor(ob=obj, name=state_var, default='')
747
748
749
def get_creation_date(brain_or_object):
750
    """Get the creation date of the brain or object
751
752
    :param brain_or_object: A single catalog brain or content object
753
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
754
    :returns: Creation date
755
    :rtype: DateTime
756
    """
757
    created = getattr(brain_or_object, "created", None)
758
    if created is None:
759
        fail("Object {} has no creation date ".format(
760
             repr(brain_or_object)))
761
    if callable(created):
762
        return created()
763
    return created
764
765
766
def get_modification_date(brain_or_object):
767
    """Get the modification date of the brain or object
768
769
    :param brain_or_object: A single catalog brain or content object
770
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
771
    :returns: Modification date
772
    :rtype: DateTime
773
    """
774
    modified = getattr(brain_or_object, "modified", None)
775
    if modified is None:
776
        fail("Object {} has no modification date ".format(
777
             repr(brain_or_object)))
778
    if callable(modified):
779
        return modified()
780
    return modified
781
782
783
def get_review_status(brain_or_object):
784
    """Get the `review_state` of an object
785
786
    :param brain_or_object: A single catalog brain or content object
787
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
788
    :returns: Value of the review_status variable
789
    :rtype: String
790
    """
791
    if is_brain(brain_or_object):
792
        return brain_or_object.review_state
793
    return get_workflow_status_of(brain_or_object, state_var="review_state")
794
795
796 View Code Duplication
def get_cancellation_status(brain_or_object, default="active"):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
797
    """Get the `cancellation_state` of an object
798
799
    :param brain_or_object: A single catalog brain or content object
800
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
801
    :returns: Value of the review_status variable
802
    :rtype: String
803
    """
804
805
    if is_brain(brain_or_object):
806
        state = getattr(brain_or_object, "cancellation_state", None)
807
        if state is not None:
808
            return state
809
810
    workflows = get_workflows_for(brain_or_object)
811
    if "bika_cancellation_workflow" in workflows:
812
        return get_workflow_status_of(brain_or_object, "cancellation_state")
813
814
    state = get_workflow_status_of(brain_or_object)
815
    if state not in ("active", "inactive"):
816
        return default
817
    return state
818
819
820 View Code Duplication
def get_inactive_status(brain_or_object, default="active"):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
821
    """Get the `cancellation_state` of an objct
822
823
    :param brain_or_object: A single catalog brain or content object
824
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
825
    :returns: Value of the review_status variable
826
    :rtype: String
827
    """
828
829
    if is_brain(brain_or_object):
830
        state = getattr(brain_or_object, "inactive_state", None)
831
        if state is not None:
832
            return state
833
834
    workflows = get_workflows_for(brain_or_object)
835
    if "bika_inactive_workflow" in workflows:
836
        return get_workflow_status_of(brain_or_object, "inactive_state")
837
838
    state = get_workflow_status_of(brain_or_object)
839
    if state not in ("active", "inactive"):
840
        return default
841
    return state
842
843
844
def is_active(brain_or_object):
845
    """Check if the workflow state of the object is 'inactive' or 'cancelled'.
846
847
    :param brain_or_object: A single catalog brain or content object
848
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
849
    :returns: False if the object is in the state 'inactive' or 'cancelled'
850
    :rtype: bool
851
    """
852
    if get_review_status(brain_or_object) == "cancelled":
853
        return False
854
    if get_inactive_status(brain_or_object) == "inactive":
855
        return False
856
    if get_cancellation_status(brain_or_object) == "cancelled":
857
        return False
858
    return True
859
860
861
def get_catalogs_for(brain_or_object, default="portal_catalog"):
862
    """Get all registered catalogs for the given portal_type, catalog brain or
863
    content object
864
865
    :param brain_or_object: The portal_type, a catalog brain or content object
866
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
867
    :returns: List of supported catalogs
868
    :rtype: list
869
    """
870
    archetype_tool = get_tool("archetype_tool", None)
871
    if not archetype_tool:
872
        # return the default catalog
873
        return [get_tool(default)]
874
875
    catalogs = []
876
877
    # get the registered catalogs for portal_type
878
    if is_object(brain_or_object):
879
        catalogs = archetype_tool.getCatalogsByType(
880
            get_portal_type(brain_or_object))
881
    if isinstance(brain_or_object, basestring):
882
        catalogs = archetype_tool.getCatalogsByType(brain_or_object)
883
884
    if not catalogs:
885
        return [get_tool(default)]
886
    return catalogs
887
888
889
def get_transitions_for(brain_or_object):
890
    """List available workflow transitions for all workflows
891
892
    :param brain_or_object: A single catalog brain or content object
893
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
894
    :returns: All possible available and allowed transitions
895
    :rtype: list[dict]
896
    """
897
    workflow = get_tool('portal_workflow')
898
    transitions = []
899
    instance = get_object(brain_or_object)
900
    for wfid in get_workflows_for(brain_or_object):
901
        wf = workflow[wfid]
902
        tlist = wf.getTransitionsFor(instance)
903
        transitions.extend([t for t in tlist if t not in transitions])
904
    return transitions
905
906
907
def do_transition_for(brain_or_object, transition):
908
    """Performs a workflow transition for the passed in object.
909
910
    :param brain_or_object: A single catalog brain or content object
911
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
912
    :returns: The object where the transtion was performed
913
    """
914
    if not isinstance(transition, basestring):
915
        fail("Transition type needs to be string, got '%s'" % type(transition))
916
    obj = get_object(brain_or_object)
917
    try:
918
        ploneapi.content.transition(obj, transition)
919
    except ploneapi.exc.InvalidParameterError as e:
920
        fail("Failed to perform transition '{}' on {}: {}".format(
921
             transition, obj, str(e)))
922
    return obj
923
924
925
def get_roles_for_permission(permission, brain_or_object):
926
    """Get a list of granted roles for the given permission on the object.
927
928
    :param brain_or_object: A single catalog brain or content object
929
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
930
    :returns: Roles for the given Permission
931
    :rtype: list
932
    """
933
    obj = get_object(brain_or_object)
934
    allowed = set(rolesForPermissionOn(permission, obj))
935
    return sorted(allowed)
936
937
938
def is_versionable(brain_or_object, policy='at_edit_autoversion'):
939
    """Checks if the passed in object is versionable.
940
941
    :param brain_or_object: A single catalog brain or content object
942
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
943
    :returns: True if the object is versionable
944
    :rtype: bool
945
    """
946
    pr = get_tool("portal_repository")
947
    obj = get_object(brain_or_object)
948
    return pr.supportsPolicy(obj, 'at_edit_autoversion') \
949
        and pr.isVersionable(obj)
950
951
952
def get_version(brain_or_object):
953
    """Get the version of the current object
954
955
    :param brain_or_object: A single catalog brain or content object
956
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
957
    :returns: The current version of the object, or None if not available
958
    :rtype: int or None
959
    """
960
    obj = get_object(brain_or_object)
961
    if not is_versionable(obj):
962
        return None
963
    return getattr(aq_base(obj), "version_id", 0)
964
965
966
def get_view(name, context=None, request=None):
967
    """Get the view by name
968
969
    :param name: The name of the view
970
    :type name: str
971
    :param context: The context to query the view
972
    :type context: ATContentType/DexterityContentType/CatalogBrain
973
    :param request: The request to query the view
974
    :type request: HTTPRequest object
975
    :returns: HTTP Request
976
    :rtype: Products.Five.metaclass View object
977
    """
978
    context = context or get_portal()
979
    request = request or get_request() or None
980
    return getMultiAdapter((get_object(context), request), name=name)
981
982
983
def get_request():
984
    """Get the global request object
985
986
    :returns: HTTP Request
987
    :rtype: HTTPRequest object
988
    """
989
    return globalrequest.getRequest()
990
991
992
def get_group(group_or_groupname):
993
    """Return Plone Group
994
995
    :param group_or_groupname: Plone group or the name of the group
996
    :type groupname:  GroupData/str
997
    :returns: Plone GroupData
998
    """
999
    if not group_or_groupname:
1000
1001
        return None
1002
    if hasattr(group_or_groupname, "_getGroup"):
1003
        return group_or_groupname
1004
    gtool = get_tool("portal_groups")
1005
    return gtool.getGroupById(group_or_groupname)
1006
1007
1008
def get_user(user_or_username):
1009
    """Return Plone User
1010
1011
    :param user_or_username: Plone user or user id
1012
    :returns: Plone MemberData
1013
    """
1014
    if not user_or_username:
1015
        return None
1016
    if hasattr(user_or_username, "getUserId"):
1017
        return ploneapi.user.get(user_or_username.getUserId())
1018
    return ploneapi.user.get(userid=user_or_username)
1019
1020
1021
def get_user_properties(user_or_username):
1022
    """Return User Properties
1023
1024
    :param user_or_username: Plone group identifier
1025
    :returns: Plone MemberData
1026
    """
1027
    user = get_user(user_or_username)
1028
    if user is None:
1029
        return {}
1030
    if not callable(user.getUser):
1031
        return {}
1032
    out = {}
1033
    plone_user = user.getUser()
1034
    for sheet in plone_user.listPropertysheets():
1035
        ps = plone_user.getPropertysheet(sheet)
1036
        out.update(dict(ps.propertyItems()))
1037
    return out
1038
1039
1040
def get_users_by_roles(roles=None):
1041
    """Search Plone users by their roles
1042
1043
    :param roles: Plone role name or list of roles
1044
    :type roles:  list/str
1045
    :returns: List of Plone users having the role(s)
1046
    """
1047
    if not isinstance(roles, (tuple, list)):
1048
        roles = [roles]
1049
    mtool = get_tool("portal_membership")
1050
    return mtool.searchForMembers(roles=roles)
1051
1052
1053
def get_current_user():
1054
    """Returns the current logged in user
1055
1056
    :returns: Current User
1057
    """
1058
    return ploneapi.user.get_current()
1059
1060
1061
def get_user_contact(user, contact_types=['Contact', 'LabContact']):
1062
    """Returns the associated contact of a Plone user
1063
1064
    If the user passed in has no contact associated, return None.
1065
    The `contact_types` parameter filter the portal types for the search.
1066
1067
    :param: Plone user
1068
    :contact_types: List with the contact portal types to search
1069
    :returns: Contact associated to the Plone user or None
1070
    """
1071
    if not user:
1072
        return None
1073
1074
    query = {'portal_type': contact_types, 'getUsername': user.id}
1075
    brains = search(query, catalog='portal_catalog')
1076
    if not brains:
1077
        return None
1078
1079
    if len(brains) > 1:
1080
        # Oops, the user has multiple contacts assigned, return None
1081
        contacts = map(lambda c: c.Title, brains)
1082
        err_msg = "User '{}' is bound to multiple Contacts '{}'"
1083
        err_msg = err_msg.format(user.id, ','.join(contacts))
1084
        logger.error(err_msg)
1085
        return None
1086
1087
    return get_object(brains[0])
1088
1089
1090
def get_user_client(user_or_contact):
1091
    """Returns the client of the contact of a Plone user
1092
1093
    If the user passed in has no contact or does not belong to any client,
1094
    returns None.
1095
1096
    :param: Plone user or contact
1097
    :returns: Client the contact of the Plone user belongs to
1098
    """
1099
    if not user_or_contact or ILabContact.providedBy(user_or_contact):
1100
        # Lab contacts cannot belong to a client
1101
        return None
1102
1103
    if not IContact.providedBy(user_or_contact):
1104
        contact = get_user_contact(user_or_contact, contact_types=['Contact'])
1105
        if IContact.providedBy(contact):
1106
            return get_user_client(contact)
1107
        return None
1108
1109
    client = get_parent(user_or_contact)
1110
    if client and IClient.providedBy(client):
1111
        return client
1112
1113
    return None
1114
1115
1116
def get_current_client():
1117
    """Returns the current client the current logged in user belongs to, if any
1118
1119
    :returns: Client the current logged in user belongs to or None
1120
    """
1121
    return get_user_client(get_current_user())
1122
1123
1124
def get_cache_key(brain_or_object):
1125
    """Generate a cache key for a common brain or object
1126
1127
    :param brain_or_object: A single catalog brain or content object
1128
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1129
    :returns: Cache Key
1130
    :rtype: str
1131
    """
1132
    key = [
1133
        get_portal_type(brain_or_object),
1134
        get_id(brain_or_object),
1135
        get_uid(brain_or_object),
1136
        # handle different domains gracefully
1137
        get_url(brain_or_object),
1138
        # Return the microsecond since the epoch in GMT
1139
        get_modification_date(brain_or_object).micros(),
1140
    ]
1141
    return "-".join(map(lambda x: str(x), key))
1142
1143
1144
def bika_cache_key_decorator(method, self, brain_or_object):
1145
    """Bika cache key decorator usable for
1146
1147
    :param brain_or_object: A single catalog brain or content object
1148
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
1149
    :returns: Cache Key
1150
    :rtype: str
1151
    """
1152
    if brain_or_object is None:
1153
        raise DontCache
1154
    return get_cache_key(brain_or_object)
1155
1156
1157
def normalize_id(string):
1158
    """Normalize the id
1159
1160
    :param string: A string to normalize
1161
    :type string: str
1162
    :returns: Normalized ID
1163
    :rtype: str
1164
    """
1165
    if not isinstance(string, basestring):
1166
        fail("Type of argument must be string, found '{}'"
1167
             .format(type(string)))
1168
    # get the id nomalizer utility
1169
    normalizer = getUtility(IIDNormalizer).normalize
1170
    return normalizer(string)
1171
1172
1173
def normalize_filename(string):
1174
    """Normalize the filename
1175
1176
    :param string: A string to normalize
1177
    :type string: str
1178
    :returns: Normalized ID
1179
    :rtype: str
1180
    """
1181
    if not isinstance(string, basestring):
1182
        fail("Type of argument must be string, found '{}'"
1183
             .format(type(string)))
1184
    # get the file nomalizer utility
1185
    normalizer = getUtility(IFileNameNormalizer).normalize
1186
    return normalizer(string)
1187
1188
1189
def is_uid(uid, validate=False):
1190
    """Checks if the passed in uid is a valid UID
1191
1192
    :param uid: The uid to check
1193
    :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If
1194
    True, also verifies if a brain exists for the uid passed in
1195
    :type uid: string
1196
    :return: True if a valid uid
1197
    :rtype: bool
1198
    """
1199
    if not isinstance(uid, basestring):
1200
        return False
1201
    if uid == '0':
1202
        return True
1203
    if len(uid) != 32:
1204
        return False
1205
    if not UID_RX.match(uid):
1206
        return False
1207
    if not validate:
1208
        return True
1209
1210
    # Check if a brain for this uid exists
1211
    uc = get_tool('uid_catalog')
1212
    brains = uc(UID=uid)
1213
    if brains:
1214
        assert (len(brains) == 1)
1215
    return len(brains) > 0
1216
1217
1218
def is_date(date):
1219
    """Checks if the passed in value is a valid Zope's DateTime
1220
1221
    :param date: The date to check
1222
    :type date: DateTime
1223
    :return: True if a valid date
1224
    :rtype: bool
1225
    """
1226
    if not date:
1227
        return False
1228
    return isinstance(date, (DateTime, datetime))
1229
1230
1231
def to_date(value, default=None):
1232
    """Tries to convert the passed in value to Zope's DateTime
1233
1234
    :param value: The value to be converted to a valid DateTime
1235
    :type value: str, DateTime or datetime
1236
    :return: The DateTime representation of the value passed in or default
1237
    """
1238
    if isinstance(value, DateTime):
1239
        return value
1240
    if not value:
1241
        if default is None:
1242
            return None
1243
        return to_date(default)
1244
    try:
1245
        if isinstance(value, str) and '.' in value:
1246
            # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls
1247
            return DateTime(value, datefmt='international')
1248
        return DateTime(value)
1249
    except (TypeError, ValueError, DateTimeError):
1250
        return to_date(default)
1251
1252
1253
def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0,
1254
               round_to_int=True):
1255
    """Returns the computed total number of minutes
1256
    """
1257
    total = float(days)*24*60 + float(hours)*60 + float(minutes) + \
1258
        float(seconds)/60 + float(milliseconds)/1000/60
1259
    return int(round(total)) if round_to_int else total
1260
1261
1262
def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
1263
    """Returns a representation of time in a string in xd yh zm format
1264
    """
1265
    minutes = to_minutes(days=days, hours=hours, minutes=minutes,
1266
                         seconds=seconds, milliseconds=milliseconds)
1267
    delta = timedelta(minutes=int(round(minutes)))
1268
    d = delta.days
1269
    h = delta.seconds // 3600
1270
    m = (delta.seconds // 60) % 60
1271
    m = m and "{}m ".format(str(m)) or ""
1272
    d = d and "{}d ".format(str(d)) or ""
1273
    if m and d:
1274
        h = "{}h ".format(str(h))
1275
    else:
1276
        h = h and "{}h ".format(str(h)) or ""
1277
    return "".join([d, h, m]).strip()
1278
1279
1280
def to_int(value, default=_marker):
1281
    """Tries to convert the value to int.
1282
    Truncates at the decimal point if the value is a float
1283
1284
    :param value: The value to be converted to an int
1285
    :return: The resulting int or default
1286
    """
1287
    if is_floatable(value):
1288
        value = to_float(value)
1289
    try:
1290
        return int(value)
1291
    except (TypeError, ValueError):
1292
        if default is None:
1293
            return default
1294
        if default is not _marker:
1295
            return to_int(default)
1296
        fail("Value %s cannot be converted to int" % repr(value))
1297
1298
1299
def is_floatable(value):
1300
    """Checks if the passed in value is a valid floatable number
1301
1302
    :param value: The value to be evaluated as a float number
1303
    :type value: str, float, int
1304
    :returns: True if is a valid float number
1305
    :rtype: bool"""
1306
    try:
1307
        float(value)
1308
        return True
1309
    except (TypeError, ValueError):
1310
        return False
1311
1312
1313
def to_float(value, default=_marker):
1314
    """Converts the passed in value to a float number
1315
1316
    :param value: The value to be converted to a floatable number
1317
    :type value: str, float, int
1318
    :returns: The float number representation of the passed in value
1319
    :rtype: float
1320
    """
1321
    if not is_floatable(value):
1322
        if default is not _marker:
1323
            return to_float(default)
1324
        fail("Value %s is not floatable" % repr(value))
1325
    return float(value)
1326
1327
1328
def to_searchable_text_metadata(value):
1329
    """Parse the given metadata value to searchable text
1330
1331
    :param value: The raw value of the metadata column
1332
    :returns: Searchable and translated unicode value or None
1333
    """
1334
    if not value:
1335
        return u""
1336
    if value is Missing.Value:
1337
        return u""
1338
    if is_uid(value):
1339
        return u""
1340
    if isinstance(value, (bool)):
1341
        return u""
1342
    if isinstance(value, (list, tuple)):
1343
        for v in value:
1344
            return to_searchable_text_metadata(v)
1345
    if isinstance(value, (dict)):
1346
        for k, v in value.items():
1347
            return to_searchable_text_metadata(v)
1348
    if is_date(value):
1349
        return value.strftime("%Y-%m-%d")
1350
    if not isinstance(value, basestring):
1351
        value = str(value)
1352
    return safe_unicode(value)
1353
1354
1355
def get_registry_record(name, default=None):
1356
    """Returns the value of a registry record
1357
1358
    :param name: [required] name of the registry record
1359
    :type name: str
1360
    :param default: The value returned if the record is not found
1361
    :type default: anything
1362
    :returns: value of the registry record
1363
    """
1364
    return ploneapi.portal.get_registry_record(name, default=default)
1365
1366
1367
def to_display_list(pairs, sort_by="key", allow_empty=True):
1368
    """Create a Plone DisplayList from list items
1369
1370
    :param pairs: list of key, value pairs
1371
    :param sort_by: Sort the items either by key or value
1372
    :param allow_empty: Allow to select an empty value
1373
    :returns: Plone DisplayList
1374
    """
1375
    dl = DisplayList()
1376
1377
    if isinstance(pairs, basestring):
1378
        pairs = [pairs, pairs]
1379
    for pair in pairs:
1380
        # pairs is a list of lists -> add each pair
1381
        if isinstance(pair, (tuple, list)):
1382
            dl.add(*pair)
1383
        # pairs is just a single pair -> add it and stop
1384
        if isinstance(pair, basestring):
1385
            dl.add(*pairs)
1386
            break
1387
1388
    # add the empty option
1389
    if allow_empty:
1390
        dl.add("", "")
1391
1392
    # sort by key/value
1393
    if sort_by == "key":
1394
        dl = dl.sortedByKey()
1395
    elif sort_by == "value":
1396
        dl = dl.sortedByValue()
1397
1398
    return dl
1399