| Total Complexity | 213 | 
| Total Lines | 1341 | 
| Duplicated Lines | 3.28 % | 
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.lims.api often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*-  | 
            ||
| 2 | #  | 
            ||
| 3 | # This file is part of SENAITE.CORE  | 
            ||
| 4 | #  | 
            ||
| 5 | # Copyright 2018 by it's authors.  | 
            ||
| 6 | # Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.  | 
            ||
| 7 | |||
| 8 | import Missing  | 
            ||
| 9 | import re  | 
            ||
| 10 | |||
| 11 | from Acquisition import aq_base  | 
            ||
| 12 | from AccessControl.PermissionRole import rolesForPermissionOn  | 
            ||
| 13 | |||
| 14 | from datetime import datetime  | 
            ||
| 15 | from DateTime import DateTime  | 
            ||
| 16 | |||
| 17 | from Products.CMFPlone.utils import base_hasattr, safe_unicode  | 
            ||
| 18 | from Products.CMFCore.interfaces import ISiteRoot  | 
            ||
| 19 | from Products.CMFCore.interfaces import IFolderish  | 
            ||
| 20 | from Products.Archetypes.BaseObject import BaseObject  | 
            ||
| 21 | from Products.ZCatalog.interfaces import ICatalogBrain  | 
            ||
| 22 | from Products.CMFPlone.utils import _createObjectByType  | 
            ||
| 23 | from Products.CMFCore.WorkflowCore import WorkflowException  | 
            ||
| 24 | from Products.CMFCore.utils import getToolByName  | 
            ||
| 25 | from bika.lims.interfaces import IClient, IContact, ILabContact  | 
            ||
| 26 | |||
| 27 | from zope import globalrequest  | 
            ||
| 28 | from zope.event import notify  | 
            ||
| 29 | from zope.interface import implements  | 
            ||
| 30 | from zope.component import getUtility  | 
            ||
| 31 | from zope.component import getMultiAdapter  | 
            ||
| 32 | from zope.component.interfaces import IFactory  | 
            ||
| 33 | from zope.component.interfaces import ObjectEvent  | 
            ||
| 34 | from zope.component.interfaces import IObjectEvent  | 
            ||
| 35 | from zope.lifecycleevent import modified  | 
            ||
| 36 | from zope.lifecycleevent import ObjectCreatedEvent  | 
            ||
| 37 | from zope.security.interfaces import Unauthorized  | 
            ||
| 38 | |||
| 39 | from plone import api as ploneapi  | 
            ||
| 40 | from plone.memoize.volatile import DontCache  | 
            ||
| 41 | from plone.api.exc import InvalidParameterError  | 
            ||
| 42 | from plone.dexterity.interfaces import IDexterityContent  | 
            ||
| 43 | from plone.app.layout.viewlets.content import ContentHistoryView  | 
            ||
| 44 | from plone.i18n.normalizer.interfaces import IFileNameNormalizer  | 
            ||
| 45 | from plone.i18n.normalizer.interfaces import IIDNormalizer  | 
            ||
| 46 | |||
| 47 | from bika.lims import logger  | 
            ||
| 48 | |||
| 49 | """Bika LIMS Framework API  | 
            ||
| 50 | |||
| 51 | Please see bika.lims/docs/API.rst for documentation.  | 
            ||
| 52 | |||
| 53 | Architecural Notes:  | 
            ||
| 54 | |||
| 55 | Please add only functions that do a single thing for a single object.  | 
            ||
| 56 | |||
| 57 | Good: `def get_foo(brain_or_object)`  | 
            ||
| 58 | Bad: `def get_foos(list_of_brain_objects)`  | 
            ||
| 59 | |||
| 60 | Why?  | 
            ||
| 61 | |||
| 62 | Because it makes things more complex. You can always use a pattern like this to  | 
            ||
| 63 | achieve the same::  | 
            ||
| 64 | |||
| 65 | >>> foos = map(get_foo, list_of_brain_objects)  | 
            ||
| 66 | |||
| 67 | Please add for all of your functions a descriptive test in docs/API.rst.  | 
            ||
| 68 | |||
| 69 | Thanks.  | 
            ||
| 70 | """  | 
            ||
| 71 | |||
| 72 | _marker = object()  | 
            ||
| 73 | |||
| 74 | UID_RX = re.compile("[a-z0-9]{32}$") | 
            ||
| 75 | |||
| 76 | |||
| 77 | class BikaLIMSError(Exception):  | 
            ||
| 78 | """Base exception class for bika.lims errors."""  | 
            ||
| 79 | |||
| 80 | |||
| 81 | class IBikaTransitionEvent(IObjectEvent):  | 
            ||
| 82 | """Bika WF transition event interface"""  | 
            ||
| 83 | |||
| 84 | |||
| 85 | class IBikaBeforeTransitionEvent(IBikaTransitionEvent):  | 
            ||
| 86 | """Fired before the transition is invoked"""  | 
            ||
| 87 | |||
| 88 | |||
| 89 | class IBikaAfterTransitionEvent(IBikaTransitionEvent):  | 
            ||
| 90 | """Fired after the transition done"""  | 
            ||
| 91 | |||
| 92 | |||
| 93 | class IBikaTransitionFailedEvent(IBikaTransitionEvent):  | 
            ||
| 94 | """Fired if the transition failed"""  | 
            ||
| 95 | |||
| 96 | |||
| 97 | class BikaTransitionEvent(ObjectEvent):  | 
            ||
| 98 | """Bika WF transition event"""  | 
            ||
| 99 | def __init__(self, obj, transition, exception=None):  | 
            ||
| 100 | ObjectEvent.__init__(self, obj)  | 
            ||
| 101 | self.obj = obj  | 
            ||
| 102 | self.transition = transition  | 
            ||
| 103 | self.exception = exception  | 
            ||
| 104 | |||
| 105 | |||
| 106 | class BikaBeforeTransitionEvent(BikaTransitionEvent):  | 
            ||
| 107 | implements(IBikaBeforeTransitionEvent)  | 
            ||
| 108 | |||
| 109 | |||
| 110 | class BikaAfterTransitionEvent(BikaTransitionEvent):  | 
            ||
| 111 | implements(IBikaAfterTransitionEvent)  | 
            ||
| 112 | |||
| 113 | |||
| 114 | class BikaTransitionFailedEvent(BikaTransitionEvent):  | 
            ||
| 115 | implements(IBikaTransitionFailedEvent)  | 
            ||
| 116 | |||
| 117 | |||
| 118 | def get_portal():  | 
            ||
| 119 | """Get the portal object  | 
            ||
| 120 | |||
| 121 | :returns: Portal object  | 
            ||
| 122 | """  | 
            ||
| 123 | return ploneapi.portal.getSite()  | 
            ||
| 124 | |||
| 125 | |||
| 126 | def get_bika_setup():  | 
            ||
| 127 | """Fetch the `bika_setup` folder.  | 
            ||
| 128 | """  | 
            ||
| 129 | portal = get_portal()  | 
            ||
| 130 |     return portal.get("bika_setup") | 
            ||
| 131 | |||
| 132 | |||
| 133 | def create(container, portal_type, *args, **kwargs):  | 
            ||
| 134 | """Creates an object in Bika LIMS  | 
            ||
| 135 | |||
| 136 | This code uses most of the parts from the TypesTool  | 
            ||
| 137 | see: `Products.CMFCore.TypesTool._constructInstance`  | 
            ||
| 138 | |||
| 139 | :param container: container  | 
            ||
| 140 | :type container: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 141 | :param portal_type: The portal type to create, e.g. "Client"  | 
            ||
| 142 | :type portal_type: string  | 
            ||
| 143 | :param title: The title for the new content object  | 
            ||
| 144 | :type title: string  | 
            ||
| 145 | :returns: The new created object  | 
            ||
| 146 | """  | 
            ||
| 147 | from bika.lims.utils import tmpID  | 
            ||
| 148 |     if kwargs.get("title") is None: | 
            ||
| 149 |         kwargs["title"] = "New {}".format(portal_type) | 
            ||
| 150 | |||
| 151 | # generate a temporary ID  | 
            ||
| 152 | tmp_id = tmpID()  | 
            ||
| 153 | |||
| 154 | # get the fti  | 
            ||
| 155 |     types_tool = get_tool("portal_types") | 
            ||
| 156 | fti = types_tool.getTypeInfo(portal_type)  | 
            ||
| 157 | |||
| 158 | if fti.product:  | 
            ||
| 159 | obj = _createObjectByType(portal_type, container, tmp_id)  | 
            ||
| 160 | else:  | 
            ||
| 161 | # newstyle factory  | 
            ||
| 162 | factory = getUtility(IFactory, fti.factory)  | 
            ||
| 163 | obj = factory(tmp_id, *args, **kwargs)  | 
            ||
| 164 | if hasattr(obj, '_setPortalTypeName'):  | 
            ||
| 165 | obj._setPortalTypeName(fti.getId())  | 
            ||
| 166 | notify(ObjectCreatedEvent(obj))  | 
            ||
| 167 | # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and ContainerModifiedEvent  | 
            ||
| 168 | container._setObject(tmp_id, obj)  | 
            ||
| 169 | # we get the object here with the current object id, as it might be renamed  | 
            ||
| 170 | # already by an event handler  | 
            ||
| 171 | obj = container._getOb(obj.getId())  | 
            ||
| 172 | |||
| 173 | # handle AT Content  | 
            ||
| 174 | if is_at_content(obj):  | 
            ||
| 175 | obj.processForm()  | 
            ||
| 176 | |||
| 177 | # Edit after processForm; processForm does AT unmarkCreationFlag.  | 
            ||
| 178 | obj.edit(**kwargs)  | 
            ||
| 179 | |||
| 180 | # explicit notification  | 
            ||
| 181 | modified(obj)  | 
            ||
| 182 | return obj  | 
            ||
| 183 | |||
| 184 | |||
| 185 | def get_tool(name, context=None, default=_marker):  | 
            ||
| 186 | """Get a portal tool by name  | 
            ||
| 187 | |||
| 188 | :param name: The name of the tool, e.g. `portal_catalog`  | 
            ||
| 189 | :type name: string  | 
            ||
| 190 | :param context: A portal object  | 
            ||
| 191 | :type context: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 192 | :returns: Portal Tool  | 
            ||
| 193 | """  | 
            ||
| 194 | |||
| 195 | # Try first with the context  | 
            ||
| 196 | if context is not None:  | 
            ||
| 197 | try:  | 
            ||
| 198 | context = get_object(context)  | 
            ||
| 199 | return getToolByName(context, name)  | 
            ||
| 200 | except (BikaLIMSError, AttributeError) as e:  | 
            ||
| 201 | # https://github.com/senaite/bika.lims/issues/396  | 
            ||
| 202 |             logger.warn("get_tool::getToolByName({}, '{}') failed: {} " | 
            ||
| 203 |                         "-> falling back to plone.api.portal.get_tool('{}')" | 
            ||
| 204 | .format(repr(context), name, repr(e), name))  | 
            ||
| 205 | return get_tool(name, default=default)  | 
            ||
| 206 | |||
| 207 | # Try with the plone api  | 
            ||
| 208 | try:  | 
            ||
| 209 | return ploneapi.portal.get_tool(name)  | 
            ||
| 210 | except InvalidParameterError:  | 
            ||
| 211 | if default is not _marker:  | 
            ||
| 212 | return default  | 
            ||
| 213 |         fail("No tool named '%s' found." % name) | 
            ||
| 214 | |||
| 215 | |||
| 216 | def fail(msg=None):  | 
            ||
| 217 | """Bika LIMS Error  | 
            ||
| 218 | """  | 
            ||
| 219 | if msg is None:  | 
            ||
| 220 | msg = "Reason not given."  | 
            ||
| 221 |     raise BikaLIMSError("{}".format(msg)) | 
            ||
| 222 | |||
| 223 | |||
| 224 | def is_object(brain_or_object):  | 
            ||
| 225 | """Check if the passed in object is a supported portal content object  | 
            ||
| 226 | |||
| 227 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 228 | :type brain_or_object: Portal Object  | 
            ||
| 229 | :returns: True if the passed in object is a valid portal content  | 
            ||
| 230 | """  | 
            ||
| 231 | if is_portal(brain_or_object):  | 
            ||
| 232 | return True  | 
            ||
| 233 | if is_at_content(brain_or_object):  | 
            ||
| 234 | return True  | 
            ||
| 235 | if is_dexterity_content(brain_or_object):  | 
            ||
| 236 | return True  | 
            ||
| 237 | if is_brain(brain_or_object):  | 
            ||
| 238 | return True  | 
            ||
| 239 | return False  | 
            ||
| 240 | |||
| 241 | |||
| 242 | def get_object(brain_or_object):  | 
            ||
| 243 | """Get the full content object  | 
            ||
| 244 | |||
| 245 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 246 | :type brain_or_object: PortalObject/ATContentType/DexterityContentType  | 
            ||
| 247 | /CatalogBrain  | 
            ||
| 248 | :returns: The full object  | 
            ||
| 249 | """  | 
            ||
| 250 | if not is_object(brain_or_object):  | 
            ||
| 251 |         fail("{} is not supported.".format(repr(brain_or_object))) | 
            ||
| 252 | if is_brain(brain_or_object):  | 
            ||
| 253 | return brain_or_object.getObject()  | 
            ||
| 254 | return brain_or_object  | 
            ||
| 255 | |||
| 256 | |||
| 257 | def is_portal(brain_or_object):  | 
            ||
| 258 | """Checks if the passed in object is the portal root object  | 
            ||
| 259 | |||
| 260 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 261 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 262 | :returns: True if the object is the portal root object  | 
            ||
| 263 | :rtype: bool  | 
            ||
| 264 | """  | 
            ||
| 265 | return ISiteRoot.providedBy(brain_or_object)  | 
            ||
| 266 | |||
| 267 | |||
| 268 | def is_brain(brain_or_object):  | 
            ||
| 269 | """Checks if the passed in object is a portal catalog brain  | 
            ||
| 270 | |||
| 271 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 272 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 273 | :returns: True if the object is a catalog brain  | 
            ||
| 274 | :rtype: bool  | 
            ||
| 275 | """  | 
            ||
| 276 | return ICatalogBrain.providedBy(brain_or_object)  | 
            ||
| 277 | |||
| 278 | |||
| 279 | def is_dexterity_content(brain_or_object):  | 
            ||
| 280 | """Checks if the passed in object is a dexterity content type  | 
            ||
| 281 | |||
| 282 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 283 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 284 | :returns: True if the object is a dexterity content type  | 
            ||
| 285 | :rtype: bool  | 
            ||
| 286 | """  | 
            ||
| 287 | return IDexterityContent.providedBy(brain_or_object)  | 
            ||
| 288 | |||
| 289 | |||
| 290 | def is_at_content(brain_or_object):  | 
            ||
| 291 | """Checks if the passed in object is an AT content type  | 
            ||
| 292 | |||
| 293 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 294 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 295 | :returns: True if the object is an AT content type  | 
            ||
| 296 | :rtype: bool  | 
            ||
| 297 | """  | 
            ||
| 298 | return isinstance(brain_or_object, BaseObject)  | 
            ||
| 299 | |||
| 300 | |||
| 301 | def is_folderish(brain_or_object):  | 
            ||
| 302 | """Checks if the passed in object is folderish  | 
            ||
| 303 | |||
| 304 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 305 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 306 | :returns: True if the object is folderish  | 
            ||
| 307 | :rtype: bool  | 
            ||
| 308 | """  | 
            ||
| 309 | if hasattr(brain_or_object, "is_folderish"):  | 
            ||
| 310 | if callable(brain_or_object.is_folderish):  | 
            ||
| 311 | return brain_or_object.is_folderish()  | 
            ||
| 312 | return brain_or_object.is_folderish  | 
            ||
| 313 | return IFolderish.providedBy(get_object(brain_or_object))  | 
            ||
| 314 | |||
| 315 | |||
| 316 | def get_portal_type(brain_or_object):  | 
            ||
| 317 | """Get the portal type for this object  | 
            ||
| 318 | |||
| 319 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 320 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 321 | :returns: Portal type  | 
            ||
| 322 | :rtype: string  | 
            ||
| 323 | """  | 
            ||
| 324 | if not is_object(brain_or_object):  | 
            ||
| 325 |         fail("{} is not supported.".format(repr(brain_or_object))) | 
            ||
| 326 | return brain_or_object.portal_type  | 
            ||
| 327 | |||
| 328 | |||
| 329 | def get_schema(brain_or_object):  | 
            ||
| 330 | """Get the schema of the content  | 
            ||
| 331 | |||
| 332 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 333 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 334 | :returns: Schema object  | 
            ||
| 335 | """  | 
            ||
| 336 | obj = get_object(brain_or_object)  | 
            ||
| 337 | if is_portal(obj):  | 
            ||
| 338 |         fail("get_schema can't return schema of portal root") | 
            ||
| 339 | if is_dexterity_content(obj):  | 
            ||
| 340 |         pt = get_tool("portal_types") | 
            ||
| 341 | fti = pt.getTypeInfo(obj.portal_type)  | 
            ||
| 342 | return fti.lookupSchema()  | 
            ||
| 343 | if is_at_content(obj):  | 
            ||
| 344 | return obj.Schema()  | 
            ||
| 345 |     fail("{} has no Schema.".format(brain_or_object)) | 
            ||
| 346 | |||
| 347 | |||
| 348 | def get_fields(brain_or_object):  | 
            ||
| 349 | """Get the list of fields from the object  | 
            ||
| 350 | |||
| 351 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 352 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 353 | :returns: List of fields  | 
            ||
| 354 | :rtype: list  | 
            ||
| 355 | """  | 
            ||
| 356 | obj = get_object(brain_or_object)  | 
            ||
| 357 | schema = get_schema(obj)  | 
            ||
| 358 | if is_dexterity_content(obj):  | 
            ||
| 359 | names = schema.names()  | 
            ||
| 360 | fields = map(lambda name: schema.get(name), names)  | 
            ||
| 361 | return dict(zip(names, fields))  | 
            ||
| 362 | return dict(zip(schema.keys(), schema.fields()))  | 
            ||
| 363 | |||
| 364 | |||
| 365 | def get_id(brain_or_object):  | 
            ||
| 366 | """Get the Plone ID for this object  | 
            ||
| 367 | |||
| 368 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 369 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 370 | :returns: Plone ID  | 
            ||
| 371 | :rtype: string  | 
            ||
| 372 | """  | 
            ||
| 373 | if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getId"):  | 
            ||
| 374 | return brain_or_object.getId  | 
            ||
| 375 | return get_object(brain_or_object).getId()  | 
            ||
| 376 | |||
| 377 | |||
| 378 | def get_title(brain_or_object):  | 
            ||
| 379 | """Get the Title for this object  | 
            ||
| 380 | |||
| 381 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 382 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 383 | :returns: Title  | 
            ||
| 384 | :rtype: string  | 
            ||
| 385 | """  | 
            ||
| 386 | if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"):  | 
            ||
| 387 | return brain_or_object.Title  | 
            ||
| 388 | return get_object(brain_or_object).Title()  | 
            ||
| 389 | |||
| 390 | |||
| 391 | def get_description(brain_or_object):  | 
            ||
| 392 | """Get the Title for this object  | 
            ||
| 393 | |||
| 394 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 395 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 396 | :returns: Title  | 
            ||
| 397 | :rtype: string  | 
            ||
| 398 | """  | 
            ||
| 399 | if is_brain(brain_or_object) \  | 
            ||
| 400 | and base_hasattr(brain_or_object, "Description"):  | 
            ||
| 401 | return brain_or_object.Description  | 
            ||
| 402 | return get_object(brain_or_object).Description()  | 
            ||
| 403 | |||
| 404 | |||
| 405 | def get_uid(brain_or_object):  | 
            ||
| 406 | """Get the Plone UID for this object  | 
            ||
| 407 | |||
| 408 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 409 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 410 | :returns: Plone UID  | 
            ||
| 411 | :rtype: string  | 
            ||
| 412 | """  | 
            ||
| 413 | if is_portal(brain_or_object):  | 
            ||
| 414 | return '0'  | 
            ||
| 415 | if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"):  | 
            ||
| 416 | return brain_or_object.UID  | 
            ||
| 417 | return get_object(brain_or_object).UID()  | 
            ||
| 418 | |||
| 419 | |||
| 420 | def get_url(brain_or_object):  | 
            ||
| 421 | """Get the absolute URL for this object  | 
            ||
| 422 | |||
| 423 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 424 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 425 | :returns: Absolute URL  | 
            ||
| 426 | :rtype: string  | 
            ||
| 427 | """  | 
            ||
| 428 | if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"):  | 
            ||
| 429 | return brain_or_object.getURL()  | 
            ||
| 430 | return get_object(brain_or_object).absolute_url()  | 
            ||
| 431 | |||
| 432 | |||
| 433 | def get_icon(brain_or_object, html_tag=True):  | 
            ||
| 434 | """Get the icon of the content object  | 
            ||
| 435 | |||
| 436 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 437 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 438 | :param html_tag: A value of 'True' returns the HTML tag, else the image url  | 
            ||
| 439 | :type html_tag: bool  | 
            ||
| 440 | :returns: HTML '<img>' tag if 'html_tag' is True else the image url  | 
            ||
| 441 | :rtype: string  | 
            ||
| 442 | """  | 
            ||
| 443 | # Manual approach, because `plone.app.layout.getIcon` does not reliable  | 
            ||
| 444 | # work for Bika Contents coming from other catalogs than the  | 
            ||
| 445 | # `portal_catalog`  | 
            ||
| 446 |     portal_types = get_tool("portal_types") | 
            ||
| 447 | fti = portal_types.getTypeInfo(brain_or_object.portal_type)  | 
            ||
| 448 | icon = fti.getIcon()  | 
            ||
| 449 | if not icon:  | 
            ||
| 450 | return ""  | 
            ||
| 451 | url = "%s/%s" % (get_url(get_portal()), icon)  | 
            ||
| 452 | if not html_tag:  | 
            ||
| 453 | return url  | 
            ||
| 454 |     tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format( | 
            ||
| 455 | url=url, title=get_title(brain_or_object))  | 
            ||
| 456 | return tag  | 
            ||
| 457 | |||
| 458 | |||
| 459 | def get_object_by_uid(uid, default=_marker):  | 
            ||
| 460 | """Find an object by a given UID  | 
            ||
| 461 | |||
| 462 | :param uid: The UID of the object to find  | 
            ||
| 463 | :type uid: string  | 
            ||
| 464 | :returns: Found Object or None  | 
            ||
| 465 | """  | 
            ||
| 466 | |||
| 467 | # nothing to do here  | 
            ||
| 468 | if not uid:  | 
            ||
| 469 | if default is not _marker:  | 
            ||
| 470 | return default  | 
            ||
| 471 |         fail("get_object_by_uid requires UID as first argument; got {} instead" | 
            ||
| 472 | .format(uid))  | 
            ||
| 473 | |||
| 474 | # we defined the portal object UID to be '0'::  | 
            ||
| 475 | if uid == '0':  | 
            ||
| 476 | return get_portal()  | 
            ||
| 477 | |||
| 478 | # we try to find the object with both catalogs  | 
            ||
| 479 | pc = get_portal_catalog()  | 
            ||
| 480 |     uc = get_tool("uid_catalog") | 
            ||
| 481 | |||
| 482 | # try to find the object with the reference catalog first  | 
            ||
| 483 | brains = uc(UID=uid)  | 
            ||
| 484 | if brains:  | 
            ||
| 485 | return brains[0].getObject()  | 
            ||
| 486 | |||
| 487 | # try to find the object with the portal catalog  | 
            ||
| 488 | res = pc(UID=uid)  | 
            ||
| 489 | if not res:  | 
            ||
| 490 | if default is not _marker:  | 
            ||
| 491 | return default  | 
            ||
| 492 |         fail("No object found for UID {}".format(uid)) | 
            ||
| 493 | |||
| 494 | return get_object(res[0])  | 
            ||
| 495 | |||
| 496 | |||
| 497 | def get_object_by_path(path, default=_marker):  | 
            ||
| 498 | """Find an object by a given physical path or absolute_url  | 
            ||
| 499 | |||
| 500 | :param path: The physical path of the object to find  | 
            ||
| 501 | :type path: string  | 
            ||
| 502 | :returns: Found Object or None  | 
            ||
| 503 | """  | 
            ||
| 504 | |||
| 505 | # nothing to do here  | 
            ||
| 506 | if not path:  | 
            ||
| 507 | if default is not _marker:  | 
            ||
| 508 | return default  | 
            ||
| 509 |         fail("get_object_by_path first argument must be a path; {} received" | 
            ||
| 510 | .format(path))  | 
            ||
| 511 | |||
| 512 | pc = get_portal_catalog()  | 
            ||
| 513 | portal = get_portal()  | 
            ||
| 514 | portal_path = get_path(portal)  | 
            ||
| 515 | portal_url = get_url(portal)  | 
            ||
| 516 | |||
| 517 | # ensure we have a physical path  | 
            ||
| 518 | if path.startswith(portal_url):  | 
            ||
| 519 | request = get_request()  | 
            ||
| 520 | path = "/".join(request.physicalPathFromURL(path))  | 
            ||
| 521 | |||
| 522 | if not path.startswith(portal_path):  | 
            ||
| 523 | if default is not _marker:  | 
            ||
| 524 | return default  | 
            ||
| 525 |         fail("Not a physical path inside the portal.") | 
            ||
| 526 | |||
| 527 | if path == portal_path:  | 
            ||
| 528 | return portal  | 
            ||
| 529 | |||
| 530 | res = pc(path=dict(query=path, depth=0))  | 
            ||
| 531 | if not res:  | 
            ||
| 532 | if default is not _marker:  | 
            ||
| 533 | return default  | 
            ||
| 534 |         fail("Object at path '{}' not found".format(path)) | 
            ||
| 535 | return get_object(res[0])  | 
            ||
| 536 | |||
| 537 | |||
| 538 | def get_path(brain_or_object):  | 
            ||
| 539 | """Calculate the physical path of this object  | 
            ||
| 540 | |||
| 541 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 542 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 543 | :returns: Physical path of the object  | 
            ||
| 544 | :rtype: string  | 
            ||
| 545 | """  | 
            ||
| 546 | if is_brain(brain_or_object):  | 
            ||
| 547 | return brain_or_object.getPath()  | 
            ||
| 548 | return "/".join(get_object(brain_or_object).getPhysicalPath())  | 
            ||
| 549 | |||
| 550 | |||
| 551 | def get_parent_path(brain_or_object):  | 
            ||
| 552 | """Calculate the physical parent path of this object  | 
            ||
| 553 | |||
| 554 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 555 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 556 | :returns: Physical path of the parent object  | 
            ||
| 557 | :rtype: string  | 
            ||
| 558 | """  | 
            ||
| 559 | if is_portal(brain_or_object):  | 
            ||
| 560 | return get_path(get_portal())  | 
            ||
| 561 | if is_brain(brain_or_object):  | 
            ||
| 562 | path = get_path(brain_or_object)  | 
            ||
| 563 |         return path.rpartition("/")[0] | 
            ||
| 564 | return get_path(get_object(brain_or_object).aq_parent)  | 
            ||
| 565 | |||
| 566 | |||
| 567 | def get_parent(brain_or_object, catalog_search=False):  | 
            ||
| 568 | """Locate the parent object of the content/catalog brain  | 
            ||
| 569 | |||
| 570 | The `catalog_search` switch uses the `portal_catalog` to do a search return  | 
            ||
| 571 | a brain instead of the full parent object. However, if the search returned  | 
            ||
| 572 | no results, it falls back to return the full parent object.  | 
            ||
| 573 | |||
| 574 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 575 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 576 | :param catalog_search: Use a catalog query to find the parent object  | 
            ||
| 577 | :type catalog_search: bool  | 
            ||
| 578 | :returns: parent object  | 
            ||
| 579 | :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain  | 
            ||
| 580 | """  | 
            ||
| 581 | |||
| 582 | if is_portal(brain_or_object):  | 
            ||
| 583 | return get_portal()  | 
            ||
| 584 | |||
| 585 | # Do a catalog search and return the brain  | 
            ||
| 586 | if catalog_search:  | 
            ||
| 587 | parent_path = get_parent_path(brain_or_object)  | 
            ||
| 588 | |||
| 589 | # parent is the portal object  | 
            ||
| 590 | if parent_path == get_path(get_portal()):  | 
            ||
| 591 | return get_portal()  | 
            ||
| 592 | |||
| 593 | # get the catalog tool  | 
            ||
| 594 | pc = get_portal_catalog()  | 
            ||
| 595 | |||
| 596 | # query for the parent path  | 
            ||
| 597 |         results = pc(path={ | 
            ||
| 598 | "query": parent_path,  | 
            ||
| 599 | "depth": 0})  | 
            ||
| 600 | |||
| 601 | # No results fallback: return the parent object  | 
            ||
| 602 | if not results:  | 
            ||
| 603 | return get_object(brain_or_object).aq_parent  | 
            ||
| 604 | |||
| 605 | # return the brain  | 
            ||
| 606 | return results[0]  | 
            ||
| 607 | |||
| 608 | return get_object(brain_or_object).aq_parent  | 
            ||
| 609 | |||
| 610 | |||
| 611 | def search(query, catalog=_marker):  | 
            ||
| 612 | """Search for objects.  | 
            ||
| 613 | |||
| 614 | :param query: A suitable search query.  | 
            ||
| 615 | :type query: dict  | 
            ||
| 616 | :param catalog: A single catalog id or a list of catalog ids  | 
            ||
| 617 | :type catalog: str/list  | 
            ||
| 618 | :returns: Search results  | 
            ||
| 619 | :rtype: List of ZCatalog brains  | 
            ||
| 620 | """  | 
            ||
| 621 | |||
| 622 | # query needs to be a dictionary  | 
            ||
| 623 | if not isinstance(query, dict):  | 
            ||
| 624 |         fail("Catalog query needs to be a dictionary") | 
            ||
| 625 | |||
| 626 | # Portal types to query  | 
            ||
| 627 |     portal_types = query.get("portal_type", []) | 
            ||
| 628 | # We want the portal_type as a list  | 
            ||
| 629 | if not isinstance(portal_types, (tuple, list)):  | 
            ||
| 630 | portal_types = [portal_types]  | 
            ||
| 631 | |||
| 632 | # The catalogs used for the query  | 
            ||
| 633 | catalogs = []  | 
            ||
| 634 | |||
| 635 | # The user did **not** specify a catalog  | 
            ||
| 636 | if catalog is _marker:  | 
            ||
| 637 | # Find the registered catalogs for the queried portal types  | 
            ||
| 638 | for portal_type in portal_types:  | 
            ||
| 639 | # Just get the first registered/default catalog  | 
            ||
| 640 | catalogs.append(get_catalogs_for(  | 
            ||
| 641 | portal_type, default="portal_catalog")[0])  | 
            ||
| 642 | else:  | 
            ||
| 643 | # User defined catalogs  | 
            ||
| 644 | if isinstance(catalog, (list, tuple)):  | 
            ||
| 645 | catalogs.extend(map(get_tool, catalog))  | 
            ||
| 646 | else:  | 
            ||
| 647 | catalogs.append(get_tool(catalog))  | 
            ||
| 648 | |||
| 649 | # Cleanup: Avoid duplicate catalogs  | 
            ||
| 650 | catalogs = list(set(catalogs)) or [get_portal_catalog()]  | 
            ||
| 651 | |||
| 652 | # We only support **single** catalog queries  | 
            ||
| 653 | if len(catalogs) > 1:  | 
            ||
| 654 |         fail("Multi Catalog Queries are not supported, please specify a catalog.") | 
            ||
| 655 | |||
| 656 | return catalogs[0](query)  | 
            ||
| 657 | |||
| 658 | |||
| 659 | def safe_getattr(brain_or_object, attr, default=_marker):  | 
            ||
| 660 | """Return the attribute value  | 
            ||
| 661 | |||
| 662 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 663 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 664 | :param attr: Attribute name  | 
            ||
| 665 | :type attr: str  | 
            ||
| 666 | :returns: Attribute value  | 
            ||
| 667 | :rtype: obj  | 
            ||
| 668 | """  | 
            ||
| 669 | try:  | 
            ||
| 670 | value = getattr(brain_or_object, attr, _marker)  | 
            ||
| 671 | if value is _marker:  | 
            ||
| 672 | if default is not _marker:  | 
            ||
| 673 | return default  | 
            ||
| 674 |             fail("Attribute '{}' not found.".format(attr)) | 
            ||
| 675 | if callable(value):  | 
            ||
| 676 | return value()  | 
            ||
| 677 | return value  | 
            ||
| 678 | except Unauthorized:  | 
            ||
| 679 | if default is not _marker:  | 
            ||
| 680 | return default  | 
            ||
| 681 |         fail("You are not authorized to access '{}' of '{}'.".format( | 
            ||
| 682 | attr, repr(brain_or_object)))  | 
            ||
| 683 | |||
| 684 | |||
| 685 | def get_portal_catalog():  | 
            ||
| 686 | """Get the portal catalog tool  | 
            ||
| 687 | |||
| 688 | :returns: Portal Catalog Tool  | 
            ||
| 689 | """  | 
            ||
| 690 |     return get_tool("portal_catalog") | 
            ||
| 691 | |||
| 692 | |||
| 693 | def get_review_history(brain_or_object, rev=True):  | 
            ||
| 694 | """Get the review history for the given brain or context.  | 
            ||
| 695 | |||
| 696 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 697 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 698 | :returns: Workflow history  | 
            ||
| 699 |     :rtype: [{}, ...] | 
            ||
| 700 | """  | 
            ||
| 701 | obj = get_object(brain_or_object)  | 
            ||
| 702 | review_history = []  | 
            ||
| 703 | try:  | 
            ||
| 704 |         workflow = get_tool("portal_workflow") | 
            ||
| 705 | review_history = workflow.getInfoFor(obj, 'review_history')  | 
            ||
| 706 | except WorkflowException as e:  | 
            ||
| 707 | message = str(e)  | 
            ||
| 708 |         logger.error("Cannot retrieve review_history on {}: {}".format( | 
            ||
| 709 | obj, message))  | 
            ||
| 710 | if not isinstance(review_history, (list, tuple)):  | 
            ||
| 711 |         logger.error("get_review_history: expected list, recieved {}".format( | 
            ||
| 712 | review_history))  | 
            ||
| 713 | review_history = []  | 
            ||
| 714 | if rev is True:  | 
            ||
| 715 | review_history.reverse()  | 
            ||
| 716 | return review_history  | 
            ||
| 717 | |||
| 718 | |||
| 719 | def get_revision_history(brain_or_object):  | 
            ||
| 720 | """Get the revision history for the given brain or context.  | 
            ||
| 721 | |||
| 722 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 723 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 724 | :returns: Workflow history  | 
            ||
| 725 | :rtype: obj  | 
            ||
| 726 | """  | 
            ||
| 727 | obj = get_object(brain_or_object)  | 
            ||
| 728 | chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None))  | 
            ||
| 729 | return chv.fullHistory()  | 
            ||
| 730 | |||
| 731 | |||
| 732 | def get_workflows_for(brain_or_object):  | 
            ||
| 733 | """Get the assigned workflows for the given brain or context.  | 
            ||
| 734 | |||
| 735 | Note: This function supports also the portal_type as parameter.  | 
            ||
| 736 | |||
| 737 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 738 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 739 | :returns: Assigned Workflows  | 
            ||
| 740 | :rtype: tuple  | 
            ||
| 741 | """  | 
            ||
| 742 |     workflow = ploneapi.portal.get_tool("portal_workflow") | 
            ||
| 743 | if isinstance(brain_or_object, basestring):  | 
            ||
| 744 | return workflow.getChainFor(brain_or_object)  | 
            ||
| 745 | obj = get_object(brain_or_object)  | 
            ||
| 746 | return workflow.getChainFor(obj)  | 
            ||
| 747 | |||
| 748 | |||
| 749 | def get_workflow_status_of(brain_or_object, state_var="review_state"):  | 
            ||
| 750 | """Get the current workflow status of the given brain or context.  | 
            ||
| 751 | |||
| 752 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 753 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 754 | :param state_var: The name of the state variable  | 
            ||
| 755 | :type state_var: string  | 
            ||
| 756 | :returns: Status  | 
            ||
| 757 | :rtype: str  | 
            ||
| 758 | """  | 
            ||
| 759 |     workflow = get_tool("portal_workflow") | 
            ||
| 760 | obj = get_object(brain_or_object)  | 
            ||
| 761 | return workflow.getInfoFor(ob=obj, name=state_var, default='')  | 
            ||
| 762 | |||
| 763 | |||
| 764 | def get_creation_date(brain_or_object):  | 
            ||
| 765 | """Get the creation date of the brain or object  | 
            ||
| 766 | |||
| 767 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 768 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 769 | :returns: Creation date  | 
            ||
| 770 | :rtype: DateTime  | 
            ||
| 771 | """  | 
            ||
| 772 | created = getattr(brain_or_object, "created", None)  | 
            ||
| 773 | if created is None:  | 
            ||
| 774 |         fail("Object {} has no creation date ".format( | 
            ||
| 775 | repr(brain_or_object)))  | 
            ||
| 776 | if callable(created):  | 
            ||
| 777 | return created()  | 
            ||
| 778 | return created  | 
            ||
| 779 | |||
| 780 | |||
| 781 | def get_modification_date(brain_or_object):  | 
            ||
| 782 | """Get the modification date of the brain or object  | 
            ||
| 783 | |||
| 784 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 785 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 786 | :returns: Modification date  | 
            ||
| 787 | :rtype: DateTime  | 
            ||
| 788 | """  | 
            ||
| 789 | modified = getattr(brain_or_object, "modified", None)  | 
            ||
| 790 | if modified is None:  | 
            ||
| 791 |         fail("Object {} has no modification date ".format( | 
            ||
| 792 | repr(brain_or_object)))  | 
            ||
| 793 | if callable(modified):  | 
            ||
| 794 | return modified()  | 
            ||
| 795 | return modified  | 
            ||
| 796 | |||
| 797 | |||
| 798 | def get_review_status(brain_or_object):  | 
            ||
| 799 | """Get the `review_state` of an object  | 
            ||
| 800 | |||
| 801 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 802 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 803 | :returns: Value of the review_status variable  | 
            ||
| 804 | :rtype: String  | 
            ||
| 805 | """  | 
            ||
| 806 | if is_brain(brain_or_object):  | 
            ||
| 807 | return brain_or_object.review_state  | 
            ||
| 808 | return get_workflow_status_of(brain_or_object, state_var="review_state")  | 
            ||
| 809 | |||
| 810 | |||
| 811 | View Code Duplication | def get_cancellation_status(brain_or_object, default="active"):  | 
            |
| 
                                                                                                    
                        
                         | 
                |||
| 812 | """Get the `cancellation_state` of an object  | 
            ||
| 813 | |||
| 814 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 815 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 816 | :returns: Value of the review_status variable  | 
            ||
| 817 | :rtype: String  | 
            ||
| 818 | """  | 
            ||
| 819 | |||
| 820 | if is_brain(brain_or_object):  | 
            ||
| 821 | state = getattr(brain_or_object, "cancellation_state", None)  | 
            ||
| 822 | if state is not None:  | 
            ||
| 823 | return state  | 
            ||
| 824 | |||
| 825 | workflows = get_workflows_for(brain_or_object)  | 
            ||
| 826 | if "bika_cancellation_workflow" in workflows:  | 
            ||
| 827 | return get_workflow_status_of(brain_or_object, "cancellation_state")  | 
            ||
| 828 | |||
| 829 | state = get_workflow_status_of(brain_or_object)  | 
            ||
| 830 |     if state not in ("active", "inactive"): | 
            ||
| 831 | return default  | 
            ||
| 832 | return state  | 
            ||
| 833 | |||
| 834 | |||
| 835 | View Code Duplication | def get_inactive_status(brain_or_object, default="active"):  | 
            |
| 836 | """Get the `cancellation_state` of an objct  | 
            ||
| 837 | |||
| 838 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 839 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 840 | :returns: Value of the review_status variable  | 
            ||
| 841 | :rtype: String  | 
            ||
| 842 | """  | 
            ||
| 843 | |||
| 844 | if is_brain(brain_or_object):  | 
            ||
| 845 | state = getattr(brain_or_object, "inactive_state", None)  | 
            ||
| 846 | if state is not None:  | 
            ||
| 847 | return state  | 
            ||
| 848 | |||
| 849 | workflows = get_workflows_for(brain_or_object)  | 
            ||
| 850 | if "bika_inactive_workflow" in workflows:  | 
            ||
| 851 | return get_workflow_status_of(brain_or_object, "inactive_state")  | 
            ||
| 852 | |||
| 853 | state = get_workflow_status_of(brain_or_object)  | 
            ||
| 854 |     if state not in ("active", "inactive"): | 
            ||
| 855 | return default  | 
            ||
| 856 | return state  | 
            ||
| 857 | |||
| 858 | |||
| 859 | def is_active(brain_or_object):  | 
            ||
| 860 | """Check if the workflow state of the object is 'inactive' or 'cancelled'.  | 
            ||
| 861 | |||
| 862 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 863 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 864 | :returns: False if the object is in the state 'inactive' or 'cancelled'  | 
            ||
| 865 | :rtype: bool  | 
            ||
| 866 | """  | 
            ||
| 867 | if get_inactive_status(brain_or_object) == "inactive":  | 
            ||
| 868 | return False  | 
            ||
| 869 | if get_cancellation_status(brain_or_object) == "cancelled":  | 
            ||
| 870 | return False  | 
            ||
| 871 | return True  | 
            ||
| 872 | |||
| 873 | |||
| 874 | def get_catalogs_for(brain_or_object, default="portal_catalog"):  | 
            ||
| 875 | """Get all registered catalogs for the given portal_type, catalog brain or  | 
            ||
| 876 | content object  | 
            ||
| 877 | |||
| 878 | :param brain_or_object: The portal_type, a catalog brain or content object  | 
            ||
| 879 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 880 | :returns: List of supported catalogs  | 
            ||
| 881 | :rtype: list  | 
            ||
| 882 | """  | 
            ||
| 883 |     archetype_tool = get_tool("archetype_tool", None) | 
            ||
| 884 | if not archetype_tool:  | 
            ||
| 885 | # return the default catalog  | 
            ||
| 886 | return [get_tool(default)]  | 
            ||
| 887 | |||
| 888 | catalogs = []  | 
            ||
| 889 | |||
| 890 | # get the registered catalogs for portal_type  | 
            ||
| 891 | if is_object(brain_or_object):  | 
            ||
| 892 | catalogs = archetype_tool.getCatalogsByType(  | 
            ||
| 893 | get_portal_type(brain_or_object))  | 
            ||
| 894 | if isinstance(brain_or_object, basestring):  | 
            ||
| 895 | catalogs = archetype_tool.getCatalogsByType(brain_or_object)  | 
            ||
| 896 | |||
| 897 | if not catalogs:  | 
            ||
| 898 | return [get_tool(default)]  | 
            ||
| 899 | return catalogs  | 
            ||
| 900 | |||
| 901 | |||
| 902 | def get_transitions_for(brain_or_object):  | 
            ||
| 903 | """List available workflow transitions for all workflows  | 
            ||
| 904 | |||
| 905 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 906 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 907 | :returns: All possible available and allowed transitions  | 
            ||
| 908 | :rtype: list[dict]  | 
            ||
| 909 | """  | 
            ||
| 910 |     workflow = get_tool('portal_workflow') | 
            ||
| 911 | transitions = []  | 
            ||
| 912 | instance = get_object(brain_or_object)  | 
            ||
| 913 | for wfid in get_workflows_for(brain_or_object):  | 
            ||
| 914 | wf = workflow[wfid]  | 
            ||
| 915 | tlist = wf.getTransitionsFor(instance)  | 
            ||
| 916 | transitions.extend([t for t in tlist if t not in transitions])  | 
            ||
| 917 | return transitions  | 
            ||
| 918 | |||
| 919 | |||
| 920 | def do_transition_for(brain_or_object, transition):  | 
            ||
| 921 | """Performs a workflow transition for the passed in object.  | 
            ||
| 922 | |||
| 923 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 924 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 925 | :returns: The object where the transtion was performed  | 
            ||
| 926 | """  | 
            ||
| 927 | if not isinstance(transition, basestring):  | 
            ||
| 928 |         fail("Transition type needs to be string, got '%s'" % type(transition)) | 
            ||
| 929 | obj = get_object(brain_or_object)  | 
            ||
| 930 | # notify the BeforeTransitionEvent  | 
            ||
| 931 | notify(BikaBeforeTransitionEvent(obj, transition))  | 
            ||
| 932 | try:  | 
            ||
| 933 | ploneapi.content.transition(obj, transition)  | 
            ||
| 934 | except ploneapi.exc.InvalidParameterError as e:  | 
            ||
| 935 | # notify the TransitionFailedEvent  | 
            ||
| 936 | notify(BikaTransitionFailedEvent(obj, transition, exception=e))  | 
            ||
| 937 |         fail("Failed to perform transition '{}' on {}: {}".format( | 
            ||
| 938 | transition, obj, str(e)))  | 
            ||
| 939 | # notify the AfterTransitionEvent  | 
            ||
| 940 | notify(BikaAfterTransitionEvent(obj, transition))  | 
            ||
| 941 | return obj  | 
            ||
| 942 | |||
| 943 | |||
| 944 | def get_roles_for_permission(permission, brain_or_object):  | 
            ||
| 945 | """Get a list of granted roles for the given permission on the object.  | 
            ||
| 946 | |||
| 947 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 948 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 949 | :returns: Roles for the given Permission  | 
            ||
| 950 | :rtype: list  | 
            ||
| 951 | """  | 
            ||
| 952 | obj = get_object(brain_or_object)  | 
            ||
| 953 | allowed = set(rolesForPermissionOn(permission, obj))  | 
            ||
| 954 | return sorted(allowed)  | 
            ||
| 955 | |||
| 956 | |||
| 957 | def is_versionable(brain_or_object, policy='at_edit_autoversion'):  | 
            ||
| 958 | """Checks if the passed in object is versionable.  | 
            ||
| 959 | |||
| 960 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 961 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 962 | :returns: True if the object is versionable  | 
            ||
| 963 | :rtype: bool  | 
            ||
| 964 | """  | 
            ||
| 965 |     pr = get_tool("portal_repository") | 
            ||
| 966 | obj = get_object(brain_or_object)  | 
            ||
| 967 | return pr.supportsPolicy(obj, 'at_edit_autoversion') \  | 
            ||
| 968 | and pr.isVersionable(obj)  | 
            ||
| 969 | |||
| 970 | |||
| 971 | def get_version(brain_or_object):  | 
            ||
| 972 | """Get the version of the current object  | 
            ||
| 973 | |||
| 974 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 975 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 976 | :returns: The current version of the object, or None if not available  | 
            ||
| 977 | :rtype: int or None  | 
            ||
| 978 | """  | 
            ||
| 979 | obj = get_object(brain_or_object)  | 
            ||
| 980 | if not is_versionable(obj):  | 
            ||
| 981 | return None  | 
            ||
| 982 | return getattr(aq_base(obj), "version_id", 0)  | 
            ||
| 983 | |||
| 984 | |||
| 985 | def get_view(name, context=None, request=None):  | 
            ||
| 986 | """Get the view by name  | 
            ||
| 987 | |||
| 988 | :param name: The name of the view  | 
            ||
| 989 | :type name: str  | 
            ||
| 990 | :param context: The context to query the view  | 
            ||
| 991 | :type context: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 992 | :param request: The request to query the view  | 
            ||
| 993 | :type request: HTTPRequest object  | 
            ||
| 994 | :returns: HTTP Request  | 
            ||
| 995 | :rtype: Products.Five.metaclass View object  | 
            ||
| 996 | """  | 
            ||
| 997 | context = context or get_portal()  | 
            ||
| 998 | request = request or get_request() or None  | 
            ||
| 999 | return getMultiAdapter((get_object(context), request), name=name)  | 
            ||
| 1000 | |||
| 1001 | |||
| 1002 | def get_request():  | 
            ||
| 1003 | """Get the global request object  | 
            ||
| 1004 | |||
| 1005 | :returns: HTTP Request  | 
            ||
| 1006 | :rtype: HTTPRequest object  | 
            ||
| 1007 | """  | 
            ||
| 1008 | return globalrequest.getRequest()  | 
            ||
| 1009 | |||
| 1010 | |||
| 1011 | def get_group(group_or_groupname):  | 
            ||
| 1012 | """Return Plone Group  | 
            ||
| 1013 | |||
| 1014 | :param group_or_groupname: Plone group or the name of the group  | 
            ||
| 1015 | :type groupname: GroupData/str  | 
            ||
| 1016 | :returns: Plone GroupData  | 
            ||
| 1017 | """  | 
            ||
| 1018 | if not group_or_groupname:  | 
            ||
| 1019 | |||
| 1020 | return None  | 
            ||
| 1021 | if hasattr(group_or_groupname, "_getGroup"):  | 
            ||
| 1022 | return group_or_groupname  | 
            ||
| 1023 |     gtool = get_tool("portal_groups") | 
            ||
| 1024 | return gtool.getGroupById(group_or_groupname)  | 
            ||
| 1025 | |||
| 1026 | |||
| 1027 | def get_user(user_or_username):  | 
            ||
| 1028 | """Return Plone User  | 
            ||
| 1029 | |||
| 1030 | :param user_or_username: Plone user or user id  | 
            ||
| 1031 | :returns: Plone MemberData  | 
            ||
| 1032 | """  | 
            ||
| 1033 | if not user_or_username:  | 
            ||
| 1034 | return None  | 
            ||
| 1035 | if hasattr(user_or_username, "getUserId"):  | 
            ||
| 1036 | return ploneapi.user.get(user_or_username.getUserId())  | 
            ||
| 1037 | return ploneapi.user.get(userid=user_or_username)  | 
            ||
| 1038 | |||
| 1039 | |||
| 1040 | def get_user_properties(user_or_username):  | 
            ||
| 1041 | """Return User Properties  | 
            ||
| 1042 | |||
| 1043 | :param user_or_username: Plone group identifier  | 
            ||
| 1044 | :returns: Plone MemberData  | 
            ||
| 1045 | """  | 
            ||
| 1046 | user = get_user(user_or_username)  | 
            ||
| 1047 | if user is None:  | 
            ||
| 1048 |         return {} | 
            ||
| 1049 | if not callable(user.getUser):  | 
            ||
| 1050 |         return {} | 
            ||
| 1051 |     out = {} | 
            ||
| 1052 | plone_user = user.getUser()  | 
            ||
| 1053 | for sheet in plone_user.listPropertysheets():  | 
            ||
| 1054 | ps = plone_user.getPropertysheet(sheet)  | 
            ||
| 1055 | out.update(dict(ps.propertyItems()))  | 
            ||
| 1056 | return out  | 
            ||
| 1057 | |||
| 1058 | |||
| 1059 | def get_users_by_roles(roles=None):  | 
            ||
| 1060 | """Search Plone users by their roles  | 
            ||
| 1061 | |||
| 1062 | :param roles: Plone role name or list of roles  | 
            ||
| 1063 | :type roles: list/str  | 
            ||
| 1064 | :returns: List of Plone users having the role(s)  | 
            ||
| 1065 | """  | 
            ||
| 1066 | if not isinstance(roles, (tuple, list)):  | 
            ||
| 1067 | roles = [roles]  | 
            ||
| 1068 |     mtool = get_tool("portal_membership") | 
            ||
| 1069 | return mtool.searchForMembers(roles=roles)  | 
            ||
| 1070 | |||
| 1071 | |||
| 1072 | def get_current_user():  | 
            ||
| 1073 | """Returns the current logged in user  | 
            ||
| 1074 | |||
| 1075 | :returns: Current User  | 
            ||
| 1076 | """  | 
            ||
| 1077 | return ploneapi.user.get_current()  | 
            ||
| 1078 | |||
| 1079 | |||
| 1080 | def get_user_contact(user, contact_types=['Contact', 'LabContact']):  | 
            ||
| 1081 | """Returns the associated contact of a Plone user  | 
            ||
| 1082 | |||
| 1083 | If the user passed in has no contact associated, return None.  | 
            ||
| 1084 | The `contact_types` parameter filter the portal types for the search.  | 
            ||
| 1085 | |||
| 1086 | :param: Plone user  | 
            ||
| 1087 | :contact_types: List with the contact portal types to search  | 
            ||
| 1088 | :returns: Contact associated to the Plone user or None  | 
            ||
| 1089 | """  | 
            ||
| 1090 | if not user:  | 
            ||
| 1091 | return None  | 
            ||
| 1092 | |||
| 1093 |     query = {'portal_type': contact_types, 'getUsername': user.id} | 
            ||
| 1094 | brains = search(query, catalog='portal_catalog')  | 
            ||
| 1095 | if not brains:  | 
            ||
| 1096 | return None  | 
            ||
| 1097 | |||
| 1098 | if len(brains) > 1:  | 
            ||
| 1099 | # Oops, the user has multiple contacts assigned, return None  | 
            ||
| 1100 | contacts = map(lambda c: c.Title, brains)  | 
            ||
| 1101 |         err_msg = "User '{}' is bound to multiple Contacts '{}'" | 
            ||
| 1102 | err_msg = err_msg.format(user.id, ','.join(contacts))  | 
            ||
| 1103 | logger.error(err_msg)  | 
            ||
| 1104 | return None  | 
            ||
| 1105 | |||
| 1106 | return get_object(brains[0])  | 
            ||
| 1107 | |||
| 1108 | |||
| 1109 | def get_user_client(user_or_contact):  | 
            ||
| 1110 | """Returns the client of the contact of a Plone user  | 
            ||
| 1111 | |||
| 1112 | If the user passed in has no contact or does not belong to any client,  | 
            ||
| 1113 | returns None.  | 
            ||
| 1114 | |||
| 1115 | :param: Plone user or contact  | 
            ||
| 1116 | :returns: Client the contact of the Plone user belongs to  | 
            ||
| 1117 | """  | 
            ||
| 1118 | if not user_or_contact or ILabContact.providedBy(user_or_contact):  | 
            ||
| 1119 | # Lab contacts cannot belong to a client  | 
            ||
| 1120 | return None  | 
            ||
| 1121 | |||
| 1122 | if not IContact.providedBy(user_or_contact):  | 
            ||
| 1123 | contact = get_user_contact(user_or_contact, contact_types=['Contact'])  | 
            ||
| 1124 | if IContact.providedBy(contact):  | 
            ||
| 1125 | return get_user_client(contact)  | 
            ||
| 1126 | return None  | 
            ||
| 1127 | |||
| 1128 | client = get_parent(user_or_contact)  | 
            ||
| 1129 | if client and IClient.providedBy(client):  | 
            ||
| 1130 | return client  | 
            ||
| 1131 | |||
| 1132 | return None  | 
            ||
| 1133 | |||
| 1134 | |||
| 1135 | def get_current_client():  | 
            ||
| 1136 | """Returns the current client the current logged in user belongs to, if any  | 
            ||
| 1137 | |||
| 1138 | :returns: Client the current logged in user belongs to or None  | 
            ||
| 1139 | """  | 
            ||
| 1140 | return get_user_client(get_current_user())  | 
            ||
| 1141 | |||
| 1142 | |||
| 1143 | def get_cache_key(brain_or_object):  | 
            ||
| 1144 | """Generate a cache key for a common brain or object  | 
            ||
| 1145 | |||
| 1146 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 1147 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 1148 | :returns: Cache Key  | 
            ||
| 1149 | :rtype: str  | 
            ||
| 1150 | """  | 
            ||
| 1151 | key = [  | 
            ||
| 1152 | get_portal_type(brain_or_object),  | 
            ||
| 1153 | get_id(brain_or_object),  | 
            ||
| 1154 | get_uid(brain_or_object),  | 
            ||
| 1155 | # handle different domains gracefully  | 
            ||
| 1156 | get_url(brain_or_object),  | 
            ||
| 1157 | # Return the microsecond since the epoch in GMT  | 
            ||
| 1158 | get_modification_date(brain_or_object).micros(),  | 
            ||
| 1159 | ]  | 
            ||
| 1160 | return "-".join(map(lambda x: str(x), key))  | 
            ||
| 1161 | |||
| 1162 | |||
| 1163 | def bika_cache_key_decorator(method, self, brain_or_object):  | 
            ||
| 1164 | """Bika cache key decorator usable for  | 
            ||
| 1165 | |||
| 1166 | :param brain_or_object: A single catalog brain or content object  | 
            ||
| 1167 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain  | 
            ||
| 1168 | :returns: Cache Key  | 
            ||
| 1169 | :rtype: str  | 
            ||
| 1170 | """  | 
            ||
| 1171 | if brain_or_object is None:  | 
            ||
| 1172 | raise DontCache  | 
            ||
| 1173 | return get_cache_key(brain_or_object)  | 
            ||
| 1174 | |||
| 1175 | |||
| 1176 | def normalize_id(string):  | 
            ||
| 1177 | """Normalize the id  | 
            ||
| 1178 | |||
| 1179 | :param string: A string to normalize  | 
            ||
| 1180 | :type string: str  | 
            ||
| 1181 | :returns: Normalized ID  | 
            ||
| 1182 | :rtype: str  | 
            ||
| 1183 | """  | 
            ||
| 1184 | if not isinstance(string, basestring):  | 
            ||
| 1185 |         fail("Type of argument must be string, found '{}'".format(type(string))) | 
            ||
| 1186 | # get the id nomalizer utility  | 
            ||
| 1187 | normalizer = getUtility(IIDNormalizer).normalize  | 
            ||
| 1188 | return normalizer(string)  | 
            ||
| 1189 | |||
| 1190 | |||
| 1191 | def normalize_filename(string):  | 
            ||
| 1192 | """Normalize the filename  | 
            ||
| 1193 | |||
| 1194 | :param string: A string to normalize  | 
            ||
| 1195 | :type string: str  | 
            ||
| 1196 | :returns: Normalized ID  | 
            ||
| 1197 | :rtype: str  | 
            ||
| 1198 | """  | 
            ||
| 1199 | if not isinstance(string, basestring):  | 
            ||
| 1200 |         fail("Type of argument must be string, found '{}'".format(type(string))) | 
            ||
| 1201 | # get the file nomalizer utility  | 
            ||
| 1202 | normalizer = getUtility(IFileNameNormalizer).normalize  | 
            ||
| 1203 | return normalizer(string)  | 
            ||
| 1204 | |||
| 1205 | |||
| 1206 | def is_uid(uid, validate=False):  | 
            ||
| 1207 | """Checks if the passed in uid is a valid UID  | 
            ||
| 1208 | |||
| 1209 | :param uid: The uid to check  | 
            ||
| 1210 | :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If  | 
            ||
| 1211 | True, also verifies if a brain exists for the uid passed in  | 
            ||
| 1212 | :type uid: string  | 
            ||
| 1213 | :return: True if a valid uid  | 
            ||
| 1214 | :rtype: bool  | 
            ||
| 1215 | """  | 
            ||
| 1216 | if not isinstance(uid, basestring):  | 
            ||
| 1217 | return False  | 
            ||
| 1218 | if len(uid) != 32:  | 
            ||
| 1219 | return False  | 
            ||
| 1220 | if not UID_RX.match(uid):  | 
            ||
| 1221 | return False  | 
            ||
| 1222 | if not validate:  | 
            ||
| 1223 | return True  | 
            ||
| 1224 | |||
| 1225 | # Check if a brain for this uid exists  | 
            ||
| 1226 |     uc = get_tool('uid_catalog') | 
            ||
| 1227 | brains = uc(UID=uid)  | 
            ||
| 1228 | if brains:  | 
            ||
| 1229 | assert (len(brains) == 1)  | 
            ||
| 1230 | return len(brains) > 0  | 
            ||
| 1231 | |||
| 1232 | |||
| 1233 | def is_date(date):  | 
            ||
| 1234 | """Checks if the passed in value is a valid Zope's DateTime  | 
            ||
| 1235 | |||
| 1236 | :param date: The date to check  | 
            ||
| 1237 | :type date: DateTime  | 
            ||
| 1238 | :return: True if a valid date  | 
            ||
| 1239 | :rtype: bool  | 
            ||
| 1240 | """  | 
            ||
| 1241 | if not date:  | 
            ||
| 1242 | return False  | 
            ||
| 1243 | return isinstance(date, (DateTime, datetime))  | 
            ||
| 1244 | |||
| 1245 | |||
| 1246 | def to_date(value, default=None):  | 
            ||
| 1247 | """Tries to convert the passed in value to Zope's DateTime  | 
            ||
| 1248 | |||
| 1249 | :param value: The value to be converted to a valid DateTime  | 
            ||
| 1250 | :type value: str, DateTime or datetime  | 
            ||
| 1251 | :return: The DateTime representation of the value passed in or default  | 
            ||
| 1252 | """  | 
            ||
| 1253 | if isinstance(value, DateTime):  | 
            ||
| 1254 | return value  | 
            ||
| 1255 | if not value:  | 
            ||
| 1256 | if default is None:  | 
            ||
| 1257 | return None  | 
            ||
| 1258 | return to_date(default)  | 
            ||
| 1259 | try:  | 
            ||
| 1260 | if isinstance(value, str) and '.' in value:  | 
            ||
| 1261 | # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls  | 
            ||
| 1262 | return DateTime(value, datefmt='international')  | 
            ||
| 1263 | return DateTime(value)  | 
            ||
| 1264 | except:  | 
            ||
| 1265 | return to_date(default)  | 
            ||
| 1266 | |||
| 1267 | |||
| 1268 | def to_int(value, default=_marker):  | 
            ||
| 1269 | """Tries to convert the value to int.  | 
            ||
| 1270 | Truncates at the decimal point if the value is a float  | 
            ||
| 1271 | |||
| 1272 | :param value: The value to be converted to an int  | 
            ||
| 1273 | :return: The resulting int or default  | 
            ||
| 1274 | """  | 
            ||
| 1275 | if is_floatable(value):  | 
            ||
| 1276 | value = to_float(value)  | 
            ||
| 1277 | try:  | 
            ||
| 1278 | return int(value)  | 
            ||
| 1279 | except (TypeError, ValueError):  | 
            ||
| 1280 | if default is None:  | 
            ||
| 1281 | return default  | 
            ||
| 1282 | if default is not _marker:  | 
            ||
| 1283 | return to_int(default)  | 
            ||
| 1284 |         fail("Value %s cannot be converted to int" % repr(value)) | 
            ||
| 1285 | |||
| 1286 | |||
| 1287 | def is_floatable(value):  | 
            ||
| 1288 | """Checks if the passed in value is a valid floatable number  | 
            ||
| 1289 | |||
| 1290 | :param value: The value to be evaluated as a float number  | 
            ||
| 1291 | :type value: str, float, int  | 
            ||
| 1292 | :returns: True if is a valid float number  | 
            ||
| 1293 | :rtype: bool"""  | 
            ||
| 1294 | try:  | 
            ||
| 1295 | float(value)  | 
            ||
| 1296 | return True  | 
            ||
| 1297 | except (TypeError, ValueError):  | 
            ||
| 1298 | return False  | 
            ||
| 1299 | |||
| 1300 | |||
| 1301 | def to_float(value, default=_marker):  | 
            ||
| 1302 | """Converts the passed in value to a float number  | 
            ||
| 1303 | |||
| 1304 | :param value: The value to be converted to a floatable number  | 
            ||
| 1305 | :type value: str, float, int  | 
            ||
| 1306 | :returns: The float number representation of the passed in value  | 
            ||
| 1307 | :rtype: float  | 
            ||
| 1308 | """  | 
            ||
| 1309 | if not is_floatable(value):  | 
            ||
| 1310 | if default is not _marker:  | 
            ||
| 1311 | return to_float(default)  | 
            ||
| 1312 |         fail("Value %s is not floatable" % repr(value)) | 
            ||
| 1313 | return float(value)  | 
            ||
| 1314 | |||
| 1315 | |||
| 1316 | def to_searchable_text_metadata(value):  | 
            ||
| 1317 | """Parse the given metadata value to searchable text  | 
            ||
| 1318 | |||
| 1319 | :param value: The raw value of the metadata column  | 
            ||
| 1320 | :returns: Searchable and translated unicode value or None  | 
            ||
| 1321 | """  | 
            ||
| 1322 | if not value:  | 
            ||
| 1323 | return u""  | 
            ||
| 1324 | if value is Missing.Value:  | 
            ||
| 1325 | return u""  | 
            ||
| 1326 | if is_uid(value):  | 
            ||
| 1327 | return u""  | 
            ||
| 1328 | if isinstance(value, (bool)):  | 
            ||
| 1329 | return u""  | 
            ||
| 1330 | if isinstance(value, (list, tuple)):  | 
            ||
| 1331 | for v in value:  | 
            ||
| 1332 | return to_searchable_text_metadata(v)  | 
            ||
| 1333 | if isinstance(value, (dict)):  | 
            ||
| 1334 | for k, v in value.items():  | 
            ||
| 1335 | return to_searchable_text_metadata(v)  | 
            ||
| 1336 | if is_date(value):  | 
            ||
| 1337 |         return value.strftime("%Y-%m-%d") | 
            ||
| 1338 | if not isinstance(value, basestring):  | 
            ||
| 1339 | value = str(value)  | 
            ||
| 1340 | return safe_unicode(value)  | 
            ||
| 1341 |