| Total Complexity | 218 |
| Total Lines | 1369 |
| Duplicated Lines | 3.21 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bika.lims.api often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*- |
||
| 2 | # |
||
| 3 | # This file is part of SENAITE.CORE |
||
| 4 | # |
||
| 5 | # Copyright 2018 by it's authors. |
||
| 6 | # Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst. |
||
| 7 | |||
| 8 | import Missing |
||
| 9 | import re |
||
| 10 | |||
| 11 | from Acquisition import aq_base |
||
| 12 | from AccessControl.PermissionRole import rolesForPermissionOn |
||
| 13 | |||
| 14 | from datetime import datetime |
||
| 15 | from datetime import timedelta |
||
| 16 | from DateTime import DateTime |
||
| 17 | |||
| 18 | from Products.CMFPlone.utils import base_hasattr, safe_unicode |
||
| 19 | from Products.CMFCore.interfaces import ISiteRoot |
||
| 20 | from Products.CMFCore.interfaces import IFolderish |
||
| 21 | from Products.Archetypes.BaseObject import BaseObject |
||
| 22 | from Products.ZCatalog.interfaces import ICatalogBrain |
||
| 23 | from Products.CMFPlone.utils import _createObjectByType |
||
| 24 | from Products.CMFCore.WorkflowCore import WorkflowException |
||
| 25 | from Products.CMFCore.utils import getToolByName |
||
| 26 | from bika.lims.interfaces import IClient, IContact, ILabContact |
||
| 27 | |||
| 28 | from zope import globalrequest |
||
| 29 | from zope.event import notify |
||
| 30 | from zope.interface import implements |
||
| 31 | from zope.component import getUtility |
||
| 32 | from zope.component import getMultiAdapter |
||
| 33 | from zope.component.interfaces import IFactory |
||
| 34 | from zope.component.interfaces import ObjectEvent |
||
| 35 | from zope.component.interfaces import IObjectEvent |
||
| 36 | from zope.lifecycleevent import modified |
||
| 37 | from zope.lifecycleevent import ObjectCreatedEvent |
||
| 38 | from zope.security.interfaces import Unauthorized |
||
| 39 | |||
| 40 | from plone import api as ploneapi |
||
| 41 | from plone.memoize.volatile import DontCache |
||
| 42 | from plone.api.exc import InvalidParameterError |
||
| 43 | from plone.dexterity.interfaces import IDexterityContent |
||
| 44 | from plone.app.layout.viewlets.content import ContentHistoryView |
||
| 45 | from plone.i18n.normalizer.interfaces import IFileNameNormalizer |
||
| 46 | from plone.i18n.normalizer.interfaces import IIDNormalizer |
||
| 47 | |||
| 48 | from bika.lims import logger |
||
| 49 | |||
| 50 | """Bika LIMS Framework API |
||
| 51 | |||
| 52 | Please see bika.lims/docs/API.rst for documentation. |
||
| 53 | |||
| 54 | Architecural Notes: |
||
| 55 | |||
| 56 | Please add only functions that do a single thing for a single object. |
||
| 57 | |||
| 58 | Good: `def get_foo(brain_or_object)` |
||
| 59 | Bad: `def get_foos(list_of_brain_objects)` |
||
| 60 | |||
| 61 | Why? |
||
| 62 | |||
| 63 | Because it makes things more complex. You can always use a pattern like this to |
||
| 64 | achieve the same:: |
||
| 65 | |||
| 66 | >>> foos = map(get_foo, list_of_brain_objects) |
||
| 67 | |||
| 68 | Please add for all of your functions a descriptive test in docs/API.rst. |
||
| 69 | |||
| 70 | Thanks. |
||
| 71 | """ |
||
| 72 | |||
| 73 | _marker = object() |
||
| 74 | |||
| 75 | UID_RX = re.compile("[a-z0-9]{32}$") |
||
| 76 | |||
| 77 | |||
| 78 | class BikaLIMSError(Exception): |
||
| 79 | """Base exception class for bika.lims errors.""" |
||
| 80 | |||
| 81 | |||
| 82 | class IBikaTransitionEvent(IObjectEvent): |
||
| 83 | """Bika WF transition event interface""" |
||
| 84 | |||
| 85 | |||
| 86 | class IBikaBeforeTransitionEvent(IBikaTransitionEvent): |
||
| 87 | """Fired before the transition is invoked""" |
||
| 88 | |||
| 89 | |||
| 90 | class IBikaAfterTransitionEvent(IBikaTransitionEvent): |
||
| 91 | """Fired after the transition done""" |
||
| 92 | |||
| 93 | |||
| 94 | class IBikaTransitionFailedEvent(IBikaTransitionEvent): |
||
| 95 | """Fired if the transition failed""" |
||
| 96 | |||
| 97 | |||
| 98 | class BikaTransitionEvent(ObjectEvent): |
||
| 99 | """Bika WF transition event""" |
||
| 100 | def __init__(self, obj, transition, exception=None): |
||
| 101 | ObjectEvent.__init__(self, obj) |
||
| 102 | self.obj = obj |
||
| 103 | self.transition = transition |
||
| 104 | self.exception = exception |
||
| 105 | |||
| 106 | |||
| 107 | class BikaBeforeTransitionEvent(BikaTransitionEvent): |
||
| 108 | implements(IBikaBeforeTransitionEvent) |
||
| 109 | |||
| 110 | |||
| 111 | class BikaAfterTransitionEvent(BikaTransitionEvent): |
||
| 112 | implements(IBikaAfterTransitionEvent) |
||
| 113 | |||
| 114 | |||
| 115 | class BikaTransitionFailedEvent(BikaTransitionEvent): |
||
| 116 | implements(IBikaTransitionFailedEvent) |
||
| 117 | |||
| 118 | |||
| 119 | def get_portal(): |
||
| 120 | """Get the portal object |
||
| 121 | |||
| 122 | :returns: Portal object |
||
| 123 | """ |
||
| 124 | return ploneapi.portal.getSite() |
||
| 125 | |||
| 126 | |||
| 127 | def get_bika_setup(): |
||
| 128 | """Fetch the `bika_setup` folder. |
||
| 129 | """ |
||
| 130 | portal = get_portal() |
||
| 131 | return portal.get("bika_setup") |
||
| 132 | |||
| 133 | |||
| 134 | def create(container, portal_type, *args, **kwargs): |
||
| 135 | """Creates an object in Bika LIMS |
||
| 136 | |||
| 137 | This code uses most of the parts from the TypesTool |
||
| 138 | see: `Products.CMFCore.TypesTool._constructInstance` |
||
| 139 | |||
| 140 | :param container: container |
||
| 141 | :type container: ATContentType/DexterityContentType/CatalogBrain |
||
| 142 | :param portal_type: The portal type to create, e.g. "Client" |
||
| 143 | :type portal_type: string |
||
| 144 | :param title: The title for the new content object |
||
| 145 | :type title: string |
||
| 146 | :returns: The new created object |
||
| 147 | """ |
||
| 148 | from bika.lims.utils import tmpID |
||
| 149 | if kwargs.get("title") is None: |
||
| 150 | kwargs["title"] = "New {}".format(portal_type) |
||
| 151 | |||
| 152 | # generate a temporary ID |
||
| 153 | tmp_id = tmpID() |
||
| 154 | |||
| 155 | # get the fti |
||
| 156 | types_tool = get_tool("portal_types") |
||
| 157 | fti = types_tool.getTypeInfo(portal_type) |
||
| 158 | |||
| 159 | if fti.product: |
||
| 160 | obj = _createObjectByType(portal_type, container, tmp_id) |
||
| 161 | else: |
||
| 162 | # newstyle factory |
||
| 163 | factory = getUtility(IFactory, fti.factory) |
||
| 164 | obj = factory(tmp_id, *args, **kwargs) |
||
| 165 | if hasattr(obj, '_setPortalTypeName'): |
||
| 166 | obj._setPortalTypeName(fti.getId()) |
||
| 167 | notify(ObjectCreatedEvent(obj)) |
||
| 168 | # notifies ObjectWillBeAddedEvent, ObjectAddedEvent and ContainerModifiedEvent |
||
| 169 | container._setObject(tmp_id, obj) |
||
| 170 | # we get the object here with the current object id, as it might be renamed |
||
| 171 | # already by an event handler |
||
| 172 | obj = container._getOb(obj.getId()) |
||
| 173 | |||
| 174 | # handle AT Content |
||
| 175 | if is_at_content(obj): |
||
| 176 | obj.processForm() |
||
| 177 | |||
| 178 | # Edit after processForm; processForm does AT unmarkCreationFlag. |
||
| 179 | obj.edit(**kwargs) |
||
| 180 | |||
| 181 | # explicit notification |
||
| 182 | modified(obj) |
||
| 183 | return obj |
||
| 184 | |||
| 185 | |||
| 186 | def get_tool(name, context=None, default=_marker): |
||
| 187 | """Get a portal tool by name |
||
| 188 | |||
| 189 | :param name: The name of the tool, e.g. `portal_catalog` |
||
| 190 | :type name: string |
||
| 191 | :param context: A portal object |
||
| 192 | :type context: ATContentType/DexterityContentType/CatalogBrain |
||
| 193 | :returns: Portal Tool |
||
| 194 | """ |
||
| 195 | |||
| 196 | # Try first with the context |
||
| 197 | if context is not None: |
||
| 198 | try: |
||
| 199 | context = get_object(context) |
||
| 200 | return getToolByName(context, name) |
||
| 201 | except (BikaLIMSError, AttributeError) as e: |
||
| 202 | # https://github.com/senaite/bika.lims/issues/396 |
||
| 203 | logger.warn("get_tool::getToolByName({}, '{}') failed: {} " |
||
| 204 | "-> falling back to plone.api.portal.get_tool('{}')" |
||
| 205 | .format(repr(context), name, repr(e), name)) |
||
| 206 | return get_tool(name, default=default) |
||
| 207 | |||
| 208 | # Try with the plone api |
||
| 209 | try: |
||
| 210 | return ploneapi.portal.get_tool(name) |
||
| 211 | except InvalidParameterError: |
||
| 212 | if default is not _marker: |
||
| 213 | return default |
||
| 214 | fail("No tool named '%s' found." % name) |
||
| 215 | |||
| 216 | |||
| 217 | def fail(msg=None): |
||
| 218 | """Bika LIMS Error |
||
| 219 | """ |
||
| 220 | if msg is None: |
||
| 221 | msg = "Reason not given." |
||
| 222 | raise BikaLIMSError("{}".format(msg)) |
||
| 223 | |||
| 224 | |||
| 225 | def is_object(brain_or_object): |
||
| 226 | """Check if the passed in object is a supported portal content object |
||
| 227 | |||
| 228 | :param brain_or_object: A single catalog brain or content object |
||
| 229 | :type brain_or_object: Portal Object |
||
| 230 | :returns: True if the passed in object is a valid portal content |
||
| 231 | """ |
||
| 232 | if is_portal(brain_or_object): |
||
| 233 | return True |
||
| 234 | if is_at_content(brain_or_object): |
||
| 235 | return True |
||
| 236 | if is_dexterity_content(brain_or_object): |
||
| 237 | return True |
||
| 238 | if is_brain(brain_or_object): |
||
| 239 | return True |
||
| 240 | return False |
||
| 241 | |||
| 242 | |||
| 243 | def get_object(brain_or_object): |
||
| 244 | """Get the full content object |
||
| 245 | |||
| 246 | :param brain_or_object: A single catalog brain or content object |
||
| 247 | :type brain_or_object: PortalObject/ATContentType/DexterityContentType |
||
| 248 | /CatalogBrain |
||
| 249 | :returns: The full object |
||
| 250 | """ |
||
| 251 | if not is_object(brain_or_object): |
||
| 252 | fail("{} is not supported.".format(repr(brain_or_object))) |
||
| 253 | if is_brain(brain_or_object): |
||
| 254 | return brain_or_object.getObject() |
||
| 255 | return brain_or_object |
||
| 256 | |||
| 257 | |||
| 258 | def is_portal(brain_or_object): |
||
| 259 | """Checks if the passed in object is the portal root object |
||
| 260 | |||
| 261 | :param brain_or_object: A single catalog brain or content object |
||
| 262 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 263 | :returns: True if the object is the portal root object |
||
| 264 | :rtype: bool |
||
| 265 | """ |
||
| 266 | return ISiteRoot.providedBy(brain_or_object) |
||
| 267 | |||
| 268 | |||
| 269 | def is_brain(brain_or_object): |
||
| 270 | """Checks if the passed in object is a portal catalog brain |
||
| 271 | |||
| 272 | :param brain_or_object: A single catalog brain or content object |
||
| 273 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 274 | :returns: True if the object is a catalog brain |
||
| 275 | :rtype: bool |
||
| 276 | """ |
||
| 277 | return ICatalogBrain.providedBy(brain_or_object) |
||
| 278 | |||
| 279 | |||
| 280 | def is_dexterity_content(brain_or_object): |
||
| 281 | """Checks if the passed in object is a dexterity content type |
||
| 282 | |||
| 283 | :param brain_or_object: A single catalog brain or content object |
||
| 284 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 285 | :returns: True if the object is a dexterity content type |
||
| 286 | :rtype: bool |
||
| 287 | """ |
||
| 288 | return IDexterityContent.providedBy(brain_or_object) |
||
| 289 | |||
| 290 | |||
| 291 | def is_at_content(brain_or_object): |
||
| 292 | """Checks if the passed in object is an AT content type |
||
| 293 | |||
| 294 | :param brain_or_object: A single catalog brain or content object |
||
| 295 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 296 | :returns: True if the object is an AT content type |
||
| 297 | :rtype: bool |
||
| 298 | """ |
||
| 299 | return isinstance(brain_or_object, BaseObject) |
||
| 300 | |||
| 301 | |||
| 302 | def is_folderish(brain_or_object): |
||
| 303 | """Checks if the passed in object is folderish |
||
| 304 | |||
| 305 | :param brain_or_object: A single catalog brain or content object |
||
| 306 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 307 | :returns: True if the object is folderish |
||
| 308 | :rtype: bool |
||
| 309 | """ |
||
| 310 | if hasattr(brain_or_object, "is_folderish"): |
||
| 311 | if callable(brain_or_object.is_folderish): |
||
| 312 | return brain_or_object.is_folderish() |
||
| 313 | return brain_or_object.is_folderish |
||
| 314 | return IFolderish.providedBy(get_object(brain_or_object)) |
||
| 315 | |||
| 316 | |||
| 317 | def get_portal_type(brain_or_object): |
||
| 318 | """Get the portal type for this object |
||
| 319 | |||
| 320 | :param brain_or_object: A single catalog brain or content object |
||
| 321 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 322 | :returns: Portal type |
||
| 323 | :rtype: string |
||
| 324 | """ |
||
| 325 | if not is_object(brain_or_object): |
||
| 326 | fail("{} is not supported.".format(repr(brain_or_object))) |
||
| 327 | return brain_or_object.portal_type |
||
| 328 | |||
| 329 | |||
| 330 | def get_schema(brain_or_object): |
||
| 331 | """Get the schema of the content |
||
| 332 | |||
| 333 | :param brain_or_object: A single catalog brain or content object |
||
| 334 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 335 | :returns: Schema object |
||
| 336 | """ |
||
| 337 | obj = get_object(brain_or_object) |
||
| 338 | if is_portal(obj): |
||
| 339 | fail("get_schema can't return schema of portal root") |
||
| 340 | if is_dexterity_content(obj): |
||
| 341 | pt = get_tool("portal_types") |
||
| 342 | fti = pt.getTypeInfo(obj.portal_type) |
||
| 343 | return fti.lookupSchema() |
||
| 344 | if is_at_content(obj): |
||
| 345 | return obj.Schema() |
||
| 346 | fail("{} has no Schema.".format(brain_or_object)) |
||
| 347 | |||
| 348 | |||
| 349 | def get_fields(brain_or_object): |
||
| 350 | """Get the list of fields from the object |
||
| 351 | |||
| 352 | :param brain_or_object: A single catalog brain or content object |
||
| 353 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 354 | :returns: List of fields |
||
| 355 | :rtype: list |
||
| 356 | """ |
||
| 357 | obj = get_object(brain_or_object) |
||
| 358 | schema = get_schema(obj) |
||
| 359 | if is_dexterity_content(obj): |
||
| 360 | names = schema.names() |
||
| 361 | fields = map(lambda name: schema.get(name), names) |
||
| 362 | return dict(zip(names, fields)) |
||
| 363 | return dict(zip(schema.keys(), schema.fields())) |
||
| 364 | |||
| 365 | |||
| 366 | def get_id(brain_or_object): |
||
| 367 | """Get the Plone ID for this object |
||
| 368 | |||
| 369 | :param brain_or_object: A single catalog brain or content object |
||
| 370 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 371 | :returns: Plone ID |
||
| 372 | :rtype: string |
||
| 373 | """ |
||
| 374 | if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getId"): |
||
| 375 | return brain_or_object.getId |
||
| 376 | return get_object(brain_or_object).getId() |
||
| 377 | |||
| 378 | |||
| 379 | def get_title(brain_or_object): |
||
| 380 | """Get the Title for this object |
||
| 381 | |||
| 382 | :param brain_or_object: A single catalog brain or content object |
||
| 383 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 384 | :returns: Title |
||
| 385 | :rtype: string |
||
| 386 | """ |
||
| 387 | if is_brain(brain_or_object) and base_hasattr(brain_or_object, "Title"): |
||
| 388 | return brain_or_object.Title |
||
| 389 | return get_object(brain_or_object).Title() |
||
| 390 | |||
| 391 | |||
| 392 | def get_description(brain_or_object): |
||
| 393 | """Get the Title for this object |
||
| 394 | |||
| 395 | :param brain_or_object: A single catalog brain or content object |
||
| 396 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 397 | :returns: Title |
||
| 398 | :rtype: string |
||
| 399 | """ |
||
| 400 | if is_brain(brain_or_object) \ |
||
| 401 | and base_hasattr(brain_or_object, "Description"): |
||
| 402 | return brain_or_object.Description |
||
| 403 | return get_object(brain_or_object).Description() |
||
| 404 | |||
| 405 | |||
| 406 | def get_uid(brain_or_object): |
||
| 407 | """Get the Plone UID for this object |
||
| 408 | |||
| 409 | :param brain_or_object: A single catalog brain or content object |
||
| 410 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 411 | :returns: Plone UID |
||
| 412 | :rtype: string |
||
| 413 | """ |
||
| 414 | if is_portal(brain_or_object): |
||
| 415 | return '0' |
||
| 416 | if is_brain(brain_or_object) and base_hasattr(brain_or_object, "UID"): |
||
| 417 | return brain_or_object.UID |
||
| 418 | return get_object(brain_or_object).UID() |
||
| 419 | |||
| 420 | |||
| 421 | def get_url(brain_or_object): |
||
| 422 | """Get the absolute URL for this object |
||
| 423 | |||
| 424 | :param brain_or_object: A single catalog brain or content object |
||
| 425 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 426 | :returns: Absolute URL |
||
| 427 | :rtype: string |
||
| 428 | """ |
||
| 429 | if is_brain(brain_or_object) and base_hasattr(brain_or_object, "getURL"): |
||
| 430 | return brain_or_object.getURL() |
||
| 431 | return get_object(brain_or_object).absolute_url() |
||
| 432 | |||
| 433 | |||
| 434 | def get_icon(brain_or_object, html_tag=True): |
||
| 435 | """Get the icon of the content object |
||
| 436 | |||
| 437 | :param brain_or_object: A single catalog brain or content object |
||
| 438 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 439 | :param html_tag: A value of 'True' returns the HTML tag, else the image url |
||
| 440 | :type html_tag: bool |
||
| 441 | :returns: HTML '<img>' tag if 'html_tag' is True else the image url |
||
| 442 | :rtype: string |
||
| 443 | """ |
||
| 444 | # Manual approach, because `plone.app.layout.getIcon` does not reliable |
||
| 445 | # work for Bika Contents coming from other catalogs than the |
||
| 446 | # `portal_catalog` |
||
| 447 | portal_types = get_tool("portal_types") |
||
| 448 | fti = portal_types.getTypeInfo(brain_or_object.portal_type) |
||
| 449 | icon = fti.getIcon() |
||
| 450 | if not icon: |
||
| 451 | return "" |
||
| 452 | url = "%s/%s" % (get_url(get_portal()), icon) |
||
| 453 | if not html_tag: |
||
| 454 | return url |
||
| 455 | tag = '<img width="16" height="16" src="{url}" title="{title}" />'.format( |
||
| 456 | url=url, title=get_title(brain_or_object)) |
||
| 457 | return tag |
||
| 458 | |||
| 459 | |||
| 460 | def get_object_by_uid(uid, default=_marker): |
||
| 461 | """Find an object by a given UID |
||
| 462 | |||
| 463 | :param uid: The UID of the object to find |
||
| 464 | :type uid: string |
||
| 465 | :returns: Found Object or None |
||
| 466 | """ |
||
| 467 | |||
| 468 | # nothing to do here |
||
| 469 | if not uid: |
||
| 470 | if default is not _marker: |
||
| 471 | return default |
||
| 472 | fail("get_object_by_uid requires UID as first argument; got {} instead" |
||
| 473 | .format(uid)) |
||
| 474 | |||
| 475 | # we defined the portal object UID to be '0':: |
||
| 476 | if uid == '0': |
||
| 477 | return get_portal() |
||
| 478 | |||
| 479 | # we try to find the object with both catalogs |
||
| 480 | pc = get_portal_catalog() |
||
| 481 | uc = get_tool("uid_catalog") |
||
| 482 | |||
| 483 | # try to find the object with the reference catalog first |
||
| 484 | brains = uc(UID=uid) |
||
| 485 | if brains: |
||
| 486 | return brains[0].getObject() |
||
| 487 | |||
| 488 | # try to find the object with the portal catalog |
||
| 489 | res = pc(UID=uid) |
||
| 490 | if not res: |
||
| 491 | if default is not _marker: |
||
| 492 | return default |
||
| 493 | fail("No object found for UID {}".format(uid)) |
||
| 494 | |||
| 495 | return get_object(res[0]) |
||
| 496 | |||
| 497 | |||
| 498 | def get_object_by_path(path, default=_marker): |
||
| 499 | """Find an object by a given physical path or absolute_url |
||
| 500 | |||
| 501 | :param path: The physical path of the object to find |
||
| 502 | :type path: string |
||
| 503 | :returns: Found Object or None |
||
| 504 | """ |
||
| 505 | |||
| 506 | # nothing to do here |
||
| 507 | if not path: |
||
| 508 | if default is not _marker: |
||
| 509 | return default |
||
| 510 | fail("get_object_by_path first argument must be a path; {} received" |
||
| 511 | .format(path)) |
||
| 512 | |||
| 513 | pc = get_portal_catalog() |
||
| 514 | portal = get_portal() |
||
| 515 | portal_path = get_path(portal) |
||
| 516 | portal_url = get_url(portal) |
||
| 517 | |||
| 518 | # ensure we have a physical path |
||
| 519 | if path.startswith(portal_url): |
||
| 520 | request = get_request() |
||
| 521 | path = "/".join(request.physicalPathFromURL(path)) |
||
| 522 | |||
| 523 | if not path.startswith(portal_path): |
||
| 524 | if default is not _marker: |
||
| 525 | return default |
||
| 526 | fail("Not a physical path inside the portal.") |
||
| 527 | |||
| 528 | if path == portal_path: |
||
| 529 | return portal |
||
| 530 | |||
| 531 | res = pc(path=dict(query=path, depth=0)) |
||
| 532 | if not res: |
||
| 533 | if default is not _marker: |
||
| 534 | return default |
||
| 535 | fail("Object at path '{}' not found".format(path)) |
||
| 536 | return get_object(res[0]) |
||
| 537 | |||
| 538 | |||
| 539 | def get_path(brain_or_object): |
||
| 540 | """Calculate the physical path of this object |
||
| 541 | |||
| 542 | :param brain_or_object: A single catalog brain or content object |
||
| 543 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 544 | :returns: Physical path of the object |
||
| 545 | :rtype: string |
||
| 546 | """ |
||
| 547 | if is_brain(brain_or_object): |
||
| 548 | return brain_or_object.getPath() |
||
| 549 | return "/".join(get_object(brain_or_object).getPhysicalPath()) |
||
| 550 | |||
| 551 | |||
| 552 | def get_parent_path(brain_or_object): |
||
| 553 | """Calculate the physical parent path of this object |
||
| 554 | |||
| 555 | :param brain_or_object: A single catalog brain or content object |
||
| 556 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 557 | :returns: Physical path of the parent object |
||
| 558 | :rtype: string |
||
| 559 | """ |
||
| 560 | if is_portal(brain_or_object): |
||
| 561 | return get_path(get_portal()) |
||
| 562 | if is_brain(brain_or_object): |
||
| 563 | path = get_path(brain_or_object) |
||
| 564 | return path.rpartition("/")[0] |
||
| 565 | return get_path(get_object(brain_or_object).aq_parent) |
||
| 566 | |||
| 567 | |||
| 568 | def get_parent(brain_or_object, catalog_search=False): |
||
| 569 | """Locate the parent object of the content/catalog brain |
||
| 570 | |||
| 571 | The `catalog_search` switch uses the `portal_catalog` to do a search return |
||
| 572 | a brain instead of the full parent object. However, if the search returned |
||
| 573 | no results, it falls back to return the full parent object. |
||
| 574 | |||
| 575 | :param brain_or_object: A single catalog brain or content object |
||
| 576 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 577 | :param catalog_search: Use a catalog query to find the parent object |
||
| 578 | :type catalog_search: bool |
||
| 579 | :returns: parent object |
||
| 580 | :rtype: ATContentType/DexterityContentType/PloneSite/CatalogBrain |
||
| 581 | """ |
||
| 582 | |||
| 583 | if is_portal(brain_or_object): |
||
| 584 | return get_portal() |
||
| 585 | |||
| 586 | # Do a catalog search and return the brain |
||
| 587 | if catalog_search: |
||
| 588 | parent_path = get_parent_path(brain_or_object) |
||
| 589 | |||
| 590 | # parent is the portal object |
||
| 591 | if parent_path == get_path(get_portal()): |
||
| 592 | return get_portal() |
||
| 593 | |||
| 594 | # get the catalog tool |
||
| 595 | pc = get_portal_catalog() |
||
| 596 | |||
| 597 | # query for the parent path |
||
| 598 | results = pc(path={ |
||
| 599 | "query": parent_path, |
||
| 600 | "depth": 0}) |
||
| 601 | |||
| 602 | # No results fallback: return the parent object |
||
| 603 | if not results: |
||
| 604 | return get_object(brain_or_object).aq_parent |
||
| 605 | |||
| 606 | # return the brain |
||
| 607 | return results[0] |
||
| 608 | |||
| 609 | return get_object(brain_or_object).aq_parent |
||
| 610 | |||
| 611 | |||
| 612 | def search(query, catalog=_marker): |
||
| 613 | """Search for objects. |
||
| 614 | |||
| 615 | :param query: A suitable search query. |
||
| 616 | :type query: dict |
||
| 617 | :param catalog: A single catalog id or a list of catalog ids |
||
| 618 | :type catalog: str/list |
||
| 619 | :returns: Search results |
||
| 620 | :rtype: List of ZCatalog brains |
||
| 621 | """ |
||
| 622 | |||
| 623 | # query needs to be a dictionary |
||
| 624 | if not isinstance(query, dict): |
||
| 625 | fail("Catalog query needs to be a dictionary") |
||
| 626 | |||
| 627 | # Portal types to query |
||
| 628 | portal_types = query.get("portal_type", []) |
||
| 629 | # We want the portal_type as a list |
||
| 630 | if not isinstance(portal_types, (tuple, list)): |
||
| 631 | portal_types = [portal_types] |
||
| 632 | |||
| 633 | # The catalogs used for the query |
||
| 634 | catalogs = [] |
||
| 635 | |||
| 636 | # The user did **not** specify a catalog |
||
| 637 | if catalog is _marker: |
||
| 638 | # Find the registered catalogs for the queried portal types |
||
| 639 | for portal_type in portal_types: |
||
| 640 | # Just get the first registered/default catalog |
||
| 641 | catalogs.append(get_catalogs_for( |
||
| 642 | portal_type, default="portal_catalog")[0]) |
||
| 643 | else: |
||
| 644 | # User defined catalogs |
||
| 645 | if isinstance(catalog, (list, tuple)): |
||
| 646 | catalogs.extend(map(get_tool, catalog)) |
||
| 647 | else: |
||
| 648 | catalogs.append(get_tool(catalog)) |
||
| 649 | |||
| 650 | # Cleanup: Avoid duplicate catalogs |
||
| 651 | catalogs = list(set(catalogs)) or [get_portal_catalog()] |
||
| 652 | |||
| 653 | # We only support **single** catalog queries |
||
| 654 | if len(catalogs) > 1: |
||
| 655 | fail("Multi Catalog Queries are not supported, please specify a catalog.") |
||
| 656 | |||
| 657 | return catalogs[0](query) |
||
| 658 | |||
| 659 | |||
| 660 | def safe_getattr(brain_or_object, attr, default=_marker): |
||
| 661 | """Return the attribute value |
||
| 662 | |||
| 663 | :param brain_or_object: A single catalog brain or content object |
||
| 664 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 665 | :param attr: Attribute name |
||
| 666 | :type attr: str |
||
| 667 | :returns: Attribute value |
||
| 668 | :rtype: obj |
||
| 669 | """ |
||
| 670 | try: |
||
| 671 | value = getattr(brain_or_object, attr, _marker) |
||
| 672 | if value is _marker: |
||
| 673 | if default is not _marker: |
||
| 674 | return default |
||
| 675 | fail("Attribute '{}' not found.".format(attr)) |
||
| 676 | if callable(value): |
||
| 677 | return value() |
||
| 678 | return value |
||
| 679 | except Unauthorized: |
||
| 680 | if default is not _marker: |
||
| 681 | return default |
||
| 682 | fail("You are not authorized to access '{}' of '{}'.".format( |
||
| 683 | attr, repr(brain_or_object))) |
||
| 684 | |||
| 685 | |||
| 686 | def get_portal_catalog(): |
||
| 687 | """Get the portal catalog tool |
||
| 688 | |||
| 689 | :returns: Portal Catalog Tool |
||
| 690 | """ |
||
| 691 | return get_tool("portal_catalog") |
||
| 692 | |||
| 693 | |||
| 694 | def get_review_history(brain_or_object, rev=True): |
||
| 695 | """Get the review history for the given brain or context. |
||
| 696 | |||
| 697 | :param brain_or_object: A single catalog brain or content object |
||
| 698 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 699 | :returns: Workflow history |
||
| 700 | :rtype: [{}, ...] |
||
| 701 | """ |
||
| 702 | obj = get_object(brain_or_object) |
||
| 703 | review_history = [] |
||
| 704 | try: |
||
| 705 | workflow = get_tool("portal_workflow") |
||
| 706 | review_history = workflow.getInfoFor(obj, 'review_history') |
||
| 707 | except WorkflowException as e: |
||
| 708 | message = str(e) |
||
| 709 | logger.error("Cannot retrieve review_history on {}: {}".format( |
||
| 710 | obj, message)) |
||
| 711 | if not isinstance(review_history, (list, tuple)): |
||
| 712 | logger.error("get_review_history: expected list, recieved {}".format( |
||
| 713 | review_history)) |
||
| 714 | review_history = [] |
||
| 715 | if rev is True: |
||
| 716 | review_history.reverse() |
||
| 717 | return review_history |
||
| 718 | |||
| 719 | |||
| 720 | def get_revision_history(brain_or_object): |
||
| 721 | """Get the revision history for the given brain or context. |
||
| 722 | |||
| 723 | :param brain_or_object: A single catalog brain or content object |
||
| 724 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 725 | :returns: Workflow history |
||
| 726 | :rtype: obj |
||
| 727 | """ |
||
| 728 | obj = get_object(brain_or_object) |
||
| 729 | chv = ContentHistoryView(obj, safe_getattr(obj, "REQUEST", None)) |
||
| 730 | return chv.fullHistory() |
||
| 731 | |||
| 732 | |||
| 733 | def get_workflows_for(brain_or_object): |
||
| 734 | """Get the assigned workflows for the given brain or context. |
||
| 735 | |||
| 736 | Note: This function supports also the portal_type as parameter. |
||
| 737 | |||
| 738 | :param brain_or_object: A single catalog brain or content object |
||
| 739 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 740 | :returns: Assigned Workflows |
||
| 741 | :rtype: tuple |
||
| 742 | """ |
||
| 743 | workflow = ploneapi.portal.get_tool("portal_workflow") |
||
| 744 | if isinstance(brain_or_object, basestring): |
||
| 745 | return workflow.getChainFor(brain_or_object) |
||
| 746 | obj = get_object(brain_or_object) |
||
| 747 | return workflow.getChainFor(obj) |
||
| 748 | |||
| 749 | |||
| 750 | def get_workflow_status_of(brain_or_object, state_var="review_state"): |
||
| 751 | """Get the current workflow status of the given brain or context. |
||
| 752 | |||
| 753 | :param brain_or_object: A single catalog brain or content object |
||
| 754 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 755 | :param state_var: The name of the state variable |
||
| 756 | :type state_var: string |
||
| 757 | :returns: Status |
||
| 758 | :rtype: str |
||
| 759 | """ |
||
| 760 | workflow = get_tool("portal_workflow") |
||
| 761 | obj = get_object(brain_or_object) |
||
| 762 | return workflow.getInfoFor(ob=obj, name=state_var, default='') |
||
| 763 | |||
| 764 | |||
| 765 | def get_creation_date(brain_or_object): |
||
| 766 | """Get the creation date of the brain or object |
||
| 767 | |||
| 768 | :param brain_or_object: A single catalog brain or content object |
||
| 769 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 770 | :returns: Creation date |
||
| 771 | :rtype: DateTime |
||
| 772 | """ |
||
| 773 | created = getattr(brain_or_object, "created", None) |
||
| 774 | if created is None: |
||
| 775 | fail("Object {} has no creation date ".format( |
||
| 776 | repr(brain_or_object))) |
||
| 777 | if callable(created): |
||
| 778 | return created() |
||
| 779 | return created |
||
| 780 | |||
| 781 | |||
| 782 | def get_modification_date(brain_or_object): |
||
| 783 | """Get the modification date of the brain or object |
||
| 784 | |||
| 785 | :param brain_or_object: A single catalog brain or content object |
||
| 786 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 787 | :returns: Modification date |
||
| 788 | :rtype: DateTime |
||
| 789 | """ |
||
| 790 | modified = getattr(brain_or_object, "modified", None) |
||
| 791 | if modified is None: |
||
| 792 | fail("Object {} has no modification date ".format( |
||
| 793 | repr(brain_or_object))) |
||
| 794 | if callable(modified): |
||
| 795 | return modified() |
||
| 796 | return modified |
||
| 797 | |||
| 798 | |||
| 799 | def get_review_status(brain_or_object): |
||
| 800 | """Get the `review_state` of an object |
||
| 801 | |||
| 802 | :param brain_or_object: A single catalog brain or content object |
||
| 803 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 804 | :returns: Value of the review_status variable |
||
| 805 | :rtype: String |
||
| 806 | """ |
||
| 807 | if is_brain(brain_or_object): |
||
| 808 | return brain_or_object.review_state |
||
| 809 | return get_workflow_status_of(brain_or_object, state_var="review_state") |
||
| 810 | |||
| 811 | |||
| 812 | View Code Duplication | def get_cancellation_status(brain_or_object, default="active"): |
|
|
|
|||
| 813 | """Get the `cancellation_state` of an object |
||
| 814 | |||
| 815 | :param brain_or_object: A single catalog brain or content object |
||
| 816 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 817 | :returns: Value of the review_status variable |
||
| 818 | :rtype: String |
||
| 819 | """ |
||
| 820 | |||
| 821 | if is_brain(brain_or_object): |
||
| 822 | state = getattr(brain_or_object, "cancellation_state", None) |
||
| 823 | if state is not None: |
||
| 824 | return state |
||
| 825 | |||
| 826 | workflows = get_workflows_for(brain_or_object) |
||
| 827 | if "bika_cancellation_workflow" in workflows: |
||
| 828 | return get_workflow_status_of(brain_or_object, "cancellation_state") |
||
| 829 | |||
| 830 | state = get_workflow_status_of(brain_or_object) |
||
| 831 | if state not in ("active", "inactive"): |
||
| 832 | return default |
||
| 833 | return state |
||
| 834 | |||
| 835 | |||
| 836 | View Code Duplication | def get_inactive_status(brain_or_object, default="active"): |
|
| 837 | """Get the `cancellation_state` of an objct |
||
| 838 | |||
| 839 | :param brain_or_object: A single catalog brain or content object |
||
| 840 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 841 | :returns: Value of the review_status variable |
||
| 842 | :rtype: String |
||
| 843 | """ |
||
| 844 | |||
| 845 | if is_brain(brain_or_object): |
||
| 846 | state = getattr(brain_or_object, "inactive_state", None) |
||
| 847 | if state is not None: |
||
| 848 | return state |
||
| 849 | |||
| 850 | workflows = get_workflows_for(brain_or_object) |
||
| 851 | if "bika_inactive_workflow" in workflows: |
||
| 852 | return get_workflow_status_of(brain_or_object, "inactive_state") |
||
| 853 | |||
| 854 | state = get_workflow_status_of(brain_or_object) |
||
| 855 | if state not in ("active", "inactive"): |
||
| 856 | return default |
||
| 857 | return state |
||
| 858 | |||
| 859 | |||
| 860 | def is_active(brain_or_object): |
||
| 861 | """Check if the workflow state of the object is 'inactive' or 'cancelled'. |
||
| 862 | |||
| 863 | :param brain_or_object: A single catalog brain or content object |
||
| 864 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 865 | :returns: False if the object is in the state 'inactive' or 'cancelled' |
||
| 866 | :rtype: bool |
||
| 867 | """ |
||
| 868 | if get_inactive_status(brain_or_object) == "inactive": |
||
| 869 | return False |
||
| 870 | if get_cancellation_status(brain_or_object) == "cancelled": |
||
| 871 | return False |
||
| 872 | return True |
||
| 873 | |||
| 874 | |||
| 875 | def get_catalogs_for(brain_or_object, default="portal_catalog"): |
||
| 876 | """Get all registered catalogs for the given portal_type, catalog brain or |
||
| 877 | content object |
||
| 878 | |||
| 879 | :param brain_or_object: The portal_type, a catalog brain or content object |
||
| 880 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 881 | :returns: List of supported catalogs |
||
| 882 | :rtype: list |
||
| 883 | """ |
||
| 884 | archetype_tool = get_tool("archetype_tool", None) |
||
| 885 | if not archetype_tool: |
||
| 886 | # return the default catalog |
||
| 887 | return [get_tool(default)] |
||
| 888 | |||
| 889 | catalogs = [] |
||
| 890 | |||
| 891 | # get the registered catalogs for portal_type |
||
| 892 | if is_object(brain_or_object): |
||
| 893 | catalogs = archetype_tool.getCatalogsByType( |
||
| 894 | get_portal_type(brain_or_object)) |
||
| 895 | if isinstance(brain_or_object, basestring): |
||
| 896 | catalogs = archetype_tool.getCatalogsByType(brain_or_object) |
||
| 897 | |||
| 898 | if not catalogs: |
||
| 899 | return [get_tool(default)] |
||
| 900 | return catalogs |
||
| 901 | |||
| 902 | |||
| 903 | def get_transitions_for(brain_or_object): |
||
| 904 | """List available workflow transitions for all workflows |
||
| 905 | |||
| 906 | :param brain_or_object: A single catalog brain or content object |
||
| 907 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 908 | :returns: All possible available and allowed transitions |
||
| 909 | :rtype: list[dict] |
||
| 910 | """ |
||
| 911 | workflow = get_tool('portal_workflow') |
||
| 912 | transitions = [] |
||
| 913 | instance = get_object(brain_or_object) |
||
| 914 | for wfid in get_workflows_for(brain_or_object): |
||
| 915 | wf = workflow[wfid] |
||
| 916 | tlist = wf.getTransitionsFor(instance) |
||
| 917 | transitions.extend([t for t in tlist if t not in transitions]) |
||
| 918 | return transitions |
||
| 919 | |||
| 920 | |||
| 921 | def do_transition_for(brain_or_object, transition): |
||
| 922 | """Performs a workflow transition for the passed in object. |
||
| 923 | |||
| 924 | :param brain_or_object: A single catalog brain or content object |
||
| 925 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 926 | :returns: The object where the transtion was performed |
||
| 927 | """ |
||
| 928 | if not isinstance(transition, basestring): |
||
| 929 | fail("Transition type needs to be string, got '%s'" % type(transition)) |
||
| 930 | obj = get_object(brain_or_object) |
||
| 931 | # notify the BeforeTransitionEvent |
||
| 932 | notify(BikaBeforeTransitionEvent(obj, transition)) |
||
| 933 | try: |
||
| 934 | ploneapi.content.transition(obj, transition) |
||
| 935 | except ploneapi.exc.InvalidParameterError as e: |
||
| 936 | # notify the TransitionFailedEvent |
||
| 937 | notify(BikaTransitionFailedEvent(obj, transition, exception=e)) |
||
| 938 | fail("Failed to perform transition '{}' on {}: {}".format( |
||
| 939 | transition, obj, str(e))) |
||
| 940 | # notify the AfterTransitionEvent |
||
| 941 | notify(BikaAfterTransitionEvent(obj, transition)) |
||
| 942 | return obj |
||
| 943 | |||
| 944 | |||
| 945 | def get_roles_for_permission(permission, brain_or_object): |
||
| 946 | """Get a list of granted roles for the given permission on the object. |
||
| 947 | |||
| 948 | :param brain_or_object: A single catalog brain or content object |
||
| 949 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 950 | :returns: Roles for the given Permission |
||
| 951 | :rtype: list |
||
| 952 | """ |
||
| 953 | obj = get_object(brain_or_object) |
||
| 954 | allowed = set(rolesForPermissionOn(permission, obj)) |
||
| 955 | return sorted(allowed) |
||
| 956 | |||
| 957 | |||
| 958 | def is_versionable(brain_or_object, policy='at_edit_autoversion'): |
||
| 959 | """Checks if the passed in object is versionable. |
||
| 960 | |||
| 961 | :param brain_or_object: A single catalog brain or content object |
||
| 962 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 963 | :returns: True if the object is versionable |
||
| 964 | :rtype: bool |
||
| 965 | """ |
||
| 966 | pr = get_tool("portal_repository") |
||
| 967 | obj = get_object(brain_or_object) |
||
| 968 | return pr.supportsPolicy(obj, 'at_edit_autoversion') \ |
||
| 969 | and pr.isVersionable(obj) |
||
| 970 | |||
| 971 | |||
| 972 | def get_version(brain_or_object): |
||
| 973 | """Get the version of the current object |
||
| 974 | |||
| 975 | :param brain_or_object: A single catalog brain or content object |
||
| 976 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 977 | :returns: The current version of the object, or None if not available |
||
| 978 | :rtype: int or None |
||
| 979 | """ |
||
| 980 | obj = get_object(brain_or_object) |
||
| 981 | if not is_versionable(obj): |
||
| 982 | return None |
||
| 983 | return getattr(aq_base(obj), "version_id", 0) |
||
| 984 | |||
| 985 | |||
| 986 | def get_view(name, context=None, request=None): |
||
| 987 | """Get the view by name |
||
| 988 | |||
| 989 | :param name: The name of the view |
||
| 990 | :type name: str |
||
| 991 | :param context: The context to query the view |
||
| 992 | :type context: ATContentType/DexterityContentType/CatalogBrain |
||
| 993 | :param request: The request to query the view |
||
| 994 | :type request: HTTPRequest object |
||
| 995 | :returns: HTTP Request |
||
| 996 | :rtype: Products.Five.metaclass View object |
||
| 997 | """ |
||
| 998 | context = context or get_portal() |
||
| 999 | request = request or get_request() or None |
||
| 1000 | return getMultiAdapter((get_object(context), request), name=name) |
||
| 1001 | |||
| 1002 | |||
| 1003 | def get_request(): |
||
| 1004 | """Get the global request object |
||
| 1005 | |||
| 1006 | :returns: HTTP Request |
||
| 1007 | :rtype: HTTPRequest object |
||
| 1008 | """ |
||
| 1009 | return globalrequest.getRequest() |
||
| 1010 | |||
| 1011 | |||
| 1012 | def get_group(group_or_groupname): |
||
| 1013 | """Return Plone Group |
||
| 1014 | |||
| 1015 | :param group_or_groupname: Plone group or the name of the group |
||
| 1016 | :type groupname: GroupData/str |
||
| 1017 | :returns: Plone GroupData |
||
| 1018 | """ |
||
| 1019 | if not group_or_groupname: |
||
| 1020 | |||
| 1021 | return None |
||
| 1022 | if hasattr(group_or_groupname, "_getGroup"): |
||
| 1023 | return group_or_groupname |
||
| 1024 | gtool = get_tool("portal_groups") |
||
| 1025 | return gtool.getGroupById(group_or_groupname) |
||
| 1026 | |||
| 1027 | |||
| 1028 | def get_user(user_or_username): |
||
| 1029 | """Return Plone User |
||
| 1030 | |||
| 1031 | :param user_or_username: Plone user or user id |
||
| 1032 | :returns: Plone MemberData |
||
| 1033 | """ |
||
| 1034 | if not user_or_username: |
||
| 1035 | return None |
||
| 1036 | if hasattr(user_or_username, "getUserId"): |
||
| 1037 | return ploneapi.user.get(user_or_username.getUserId()) |
||
| 1038 | return ploneapi.user.get(userid=user_or_username) |
||
| 1039 | |||
| 1040 | |||
| 1041 | def get_user_properties(user_or_username): |
||
| 1042 | """Return User Properties |
||
| 1043 | |||
| 1044 | :param user_or_username: Plone group identifier |
||
| 1045 | :returns: Plone MemberData |
||
| 1046 | """ |
||
| 1047 | user = get_user(user_or_username) |
||
| 1048 | if user is None: |
||
| 1049 | return {} |
||
| 1050 | if not callable(user.getUser): |
||
| 1051 | return {} |
||
| 1052 | out = {} |
||
| 1053 | plone_user = user.getUser() |
||
| 1054 | for sheet in plone_user.listPropertysheets(): |
||
| 1055 | ps = plone_user.getPropertysheet(sheet) |
||
| 1056 | out.update(dict(ps.propertyItems())) |
||
| 1057 | return out |
||
| 1058 | |||
| 1059 | |||
| 1060 | def get_users_by_roles(roles=None): |
||
| 1061 | """Search Plone users by their roles |
||
| 1062 | |||
| 1063 | :param roles: Plone role name or list of roles |
||
| 1064 | :type roles: list/str |
||
| 1065 | :returns: List of Plone users having the role(s) |
||
| 1066 | """ |
||
| 1067 | if not isinstance(roles, (tuple, list)): |
||
| 1068 | roles = [roles] |
||
| 1069 | mtool = get_tool("portal_membership") |
||
| 1070 | return mtool.searchForMembers(roles=roles) |
||
| 1071 | |||
| 1072 | |||
| 1073 | def get_current_user(): |
||
| 1074 | """Returns the current logged in user |
||
| 1075 | |||
| 1076 | :returns: Current User |
||
| 1077 | """ |
||
| 1078 | return ploneapi.user.get_current() |
||
| 1079 | |||
| 1080 | |||
| 1081 | def get_user_contact(user, contact_types=['Contact', 'LabContact']): |
||
| 1082 | """Returns the associated contact of a Plone user |
||
| 1083 | |||
| 1084 | If the user passed in has no contact associated, return None. |
||
| 1085 | The `contact_types` parameter filter the portal types for the search. |
||
| 1086 | |||
| 1087 | :param: Plone user |
||
| 1088 | :contact_types: List with the contact portal types to search |
||
| 1089 | :returns: Contact associated to the Plone user or None |
||
| 1090 | """ |
||
| 1091 | if not user: |
||
| 1092 | return None |
||
| 1093 | |||
| 1094 | query = {'portal_type': contact_types, 'getUsername': user.id} |
||
| 1095 | brains = search(query, catalog='portal_catalog') |
||
| 1096 | if not brains: |
||
| 1097 | return None |
||
| 1098 | |||
| 1099 | if len(brains) > 1: |
||
| 1100 | # Oops, the user has multiple contacts assigned, return None |
||
| 1101 | contacts = map(lambda c: c.Title, brains) |
||
| 1102 | err_msg = "User '{}' is bound to multiple Contacts '{}'" |
||
| 1103 | err_msg = err_msg.format(user.id, ','.join(contacts)) |
||
| 1104 | logger.error(err_msg) |
||
| 1105 | return None |
||
| 1106 | |||
| 1107 | return get_object(brains[0]) |
||
| 1108 | |||
| 1109 | |||
| 1110 | def get_user_client(user_or_contact): |
||
| 1111 | """Returns the client of the contact of a Plone user |
||
| 1112 | |||
| 1113 | If the user passed in has no contact or does not belong to any client, |
||
| 1114 | returns None. |
||
| 1115 | |||
| 1116 | :param: Plone user or contact |
||
| 1117 | :returns: Client the contact of the Plone user belongs to |
||
| 1118 | """ |
||
| 1119 | if not user_or_contact or ILabContact.providedBy(user_or_contact): |
||
| 1120 | # Lab contacts cannot belong to a client |
||
| 1121 | return None |
||
| 1122 | |||
| 1123 | if not IContact.providedBy(user_or_contact): |
||
| 1124 | contact = get_user_contact(user_or_contact, contact_types=['Contact']) |
||
| 1125 | if IContact.providedBy(contact): |
||
| 1126 | return get_user_client(contact) |
||
| 1127 | return None |
||
| 1128 | |||
| 1129 | client = get_parent(user_or_contact) |
||
| 1130 | if client and IClient.providedBy(client): |
||
| 1131 | return client |
||
| 1132 | |||
| 1133 | return None |
||
| 1134 | |||
| 1135 | |||
| 1136 | def get_current_client(): |
||
| 1137 | """Returns the current client the current logged in user belongs to, if any |
||
| 1138 | |||
| 1139 | :returns: Client the current logged in user belongs to or None |
||
| 1140 | """ |
||
| 1141 | return get_user_client(get_current_user()) |
||
| 1142 | |||
| 1143 | |||
| 1144 | def get_cache_key(brain_or_object): |
||
| 1145 | """Generate a cache key for a common brain or object |
||
| 1146 | |||
| 1147 | :param brain_or_object: A single catalog brain or content object |
||
| 1148 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 1149 | :returns: Cache Key |
||
| 1150 | :rtype: str |
||
| 1151 | """ |
||
| 1152 | key = [ |
||
| 1153 | get_portal_type(brain_or_object), |
||
| 1154 | get_id(brain_or_object), |
||
| 1155 | get_uid(brain_or_object), |
||
| 1156 | # handle different domains gracefully |
||
| 1157 | get_url(brain_or_object), |
||
| 1158 | # Return the microsecond since the epoch in GMT |
||
| 1159 | get_modification_date(brain_or_object).micros(), |
||
| 1160 | ] |
||
| 1161 | return "-".join(map(lambda x: str(x), key)) |
||
| 1162 | |||
| 1163 | |||
| 1164 | def bika_cache_key_decorator(method, self, brain_or_object): |
||
| 1165 | """Bika cache key decorator usable for |
||
| 1166 | |||
| 1167 | :param brain_or_object: A single catalog brain or content object |
||
| 1168 | :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain |
||
| 1169 | :returns: Cache Key |
||
| 1170 | :rtype: str |
||
| 1171 | """ |
||
| 1172 | if brain_or_object is None: |
||
| 1173 | raise DontCache |
||
| 1174 | return get_cache_key(brain_or_object) |
||
| 1175 | |||
| 1176 | |||
| 1177 | def normalize_id(string): |
||
| 1178 | """Normalize the id |
||
| 1179 | |||
| 1180 | :param string: A string to normalize |
||
| 1181 | :type string: str |
||
| 1182 | :returns: Normalized ID |
||
| 1183 | :rtype: str |
||
| 1184 | """ |
||
| 1185 | if not isinstance(string, basestring): |
||
| 1186 | fail("Type of argument must be string, found '{}'".format(type(string))) |
||
| 1187 | # get the id nomalizer utility |
||
| 1188 | normalizer = getUtility(IIDNormalizer).normalize |
||
| 1189 | return normalizer(string) |
||
| 1190 | |||
| 1191 | |||
| 1192 | def normalize_filename(string): |
||
| 1193 | """Normalize the filename |
||
| 1194 | |||
| 1195 | :param string: A string to normalize |
||
| 1196 | :type string: str |
||
| 1197 | :returns: Normalized ID |
||
| 1198 | :rtype: str |
||
| 1199 | """ |
||
| 1200 | if not isinstance(string, basestring): |
||
| 1201 | fail("Type of argument must be string, found '{}'".format(type(string))) |
||
| 1202 | # get the file nomalizer utility |
||
| 1203 | normalizer = getUtility(IFileNameNormalizer).normalize |
||
| 1204 | return normalizer(string) |
||
| 1205 | |||
| 1206 | |||
| 1207 | def is_uid(uid, validate=False): |
||
| 1208 | """Checks if the passed in uid is a valid UID |
||
| 1209 | |||
| 1210 | :param uid: The uid to check |
||
| 1211 | :param validate: If False, checks if uid is a valid 23 alphanumeric uid. If |
||
| 1212 | True, also verifies if a brain exists for the uid passed in |
||
| 1213 | :type uid: string |
||
| 1214 | :return: True if a valid uid |
||
| 1215 | :rtype: bool |
||
| 1216 | """ |
||
| 1217 | if not isinstance(uid, basestring): |
||
| 1218 | return False |
||
| 1219 | if len(uid) != 32: |
||
| 1220 | return False |
||
| 1221 | if not UID_RX.match(uid): |
||
| 1222 | return False |
||
| 1223 | if not validate: |
||
| 1224 | return True |
||
| 1225 | |||
| 1226 | # Check if a brain for this uid exists |
||
| 1227 | uc = get_tool('uid_catalog') |
||
| 1228 | brains = uc(UID=uid) |
||
| 1229 | if brains: |
||
| 1230 | assert (len(brains) == 1) |
||
| 1231 | return len(brains) > 0 |
||
| 1232 | |||
| 1233 | |||
| 1234 | def is_date(date): |
||
| 1235 | """Checks if the passed in value is a valid Zope's DateTime |
||
| 1236 | |||
| 1237 | :param date: The date to check |
||
| 1238 | :type date: DateTime |
||
| 1239 | :return: True if a valid date |
||
| 1240 | :rtype: bool |
||
| 1241 | """ |
||
| 1242 | if not date: |
||
| 1243 | return False |
||
| 1244 | return isinstance(date, (DateTime, datetime)) |
||
| 1245 | |||
| 1246 | |||
| 1247 | def to_date(value, default=None): |
||
| 1248 | """Tries to convert the passed in value to Zope's DateTime |
||
| 1249 | |||
| 1250 | :param value: The value to be converted to a valid DateTime |
||
| 1251 | :type value: str, DateTime or datetime |
||
| 1252 | :return: The DateTime representation of the value passed in or default |
||
| 1253 | """ |
||
| 1254 | if isinstance(value, DateTime): |
||
| 1255 | return value |
||
| 1256 | if not value: |
||
| 1257 | if default is None: |
||
| 1258 | return None |
||
| 1259 | return to_date(default) |
||
| 1260 | try: |
||
| 1261 | if isinstance(value, str) and '.' in value: |
||
| 1262 | # https://docs.plone.org/develop/plone/misc/datetime.html#datetime-problems-and-pitfalls |
||
| 1263 | return DateTime(value, datefmt='international') |
||
| 1264 | return DateTime(value) |
||
| 1265 | except: |
||
| 1266 | return to_date(default) |
||
| 1267 | |||
| 1268 | |||
| 1269 | def to_minutes(days=0, hours=0, minutes=0, seconds=0, milliseconds=0, |
||
| 1270 | round_to_int=True): |
||
| 1271 | """Returns the computed total number of minutes |
||
| 1272 | """ |
||
| 1273 | total = float(days)*24*60 + float(hours)*60 + float(minutes) + \ |
||
| 1274 | float(seconds)/60 + float(milliseconds)/1000/60 |
||
| 1275 | return int(round(total)) if round_to_int else total |
||
| 1276 | |||
| 1277 | |||
| 1278 | def to_dhm_format(days=0, hours=0, minutes=0, seconds=0, milliseconds=0): |
||
| 1279 | """Returns a representation of time in a string in xd yh zm format |
||
| 1280 | """ |
||
| 1281 | minutes = to_minutes(days=days, hours=hours, minutes=minutes, |
||
| 1282 | seconds=seconds, milliseconds=milliseconds) |
||
| 1283 | delta = timedelta(minutes=int(round(minutes))) |
||
| 1284 | d = delta.days |
||
| 1285 | h = delta.seconds // 3600 |
||
| 1286 | m = (delta.seconds // 60) % 60 |
||
| 1287 | m = m and "{}m ".format(str(m)) or "" |
||
| 1288 | d = d and "{}d ".format(str(d)) or "" |
||
| 1289 | if m and d: |
||
| 1290 | h = "{}h ".format(str(h)) |
||
| 1291 | else: |
||
| 1292 | h = h and "{}h ".format(str(h)) or "" |
||
| 1293 | return "".join([d, h, m]).strip() |
||
| 1294 | |||
| 1295 | |||
| 1296 | def to_int(value, default=_marker): |
||
| 1297 | """Tries to convert the value to int. |
||
| 1298 | Truncates at the decimal point if the value is a float |
||
| 1299 | |||
| 1300 | :param value: The value to be converted to an int |
||
| 1301 | :return: The resulting int or default |
||
| 1302 | """ |
||
| 1303 | if is_floatable(value): |
||
| 1304 | value = to_float(value) |
||
| 1305 | try: |
||
| 1306 | return int(value) |
||
| 1307 | except (TypeError, ValueError): |
||
| 1308 | if default is None: |
||
| 1309 | return default |
||
| 1310 | if default is not _marker: |
||
| 1311 | return to_int(default) |
||
| 1312 | fail("Value %s cannot be converted to int" % repr(value)) |
||
| 1313 | |||
| 1314 | |||
| 1315 | def is_floatable(value): |
||
| 1316 | """Checks if the passed in value is a valid floatable number |
||
| 1317 | |||
| 1318 | :param value: The value to be evaluated as a float number |
||
| 1319 | :type value: str, float, int |
||
| 1320 | :returns: True if is a valid float number |
||
| 1321 | :rtype: bool""" |
||
| 1322 | try: |
||
| 1323 | float(value) |
||
| 1324 | return True |
||
| 1325 | except (TypeError, ValueError): |
||
| 1326 | return False |
||
| 1327 | |||
| 1328 | |||
| 1329 | def to_float(value, default=_marker): |
||
| 1330 | """Converts the passed in value to a float number |
||
| 1331 | |||
| 1332 | :param value: The value to be converted to a floatable number |
||
| 1333 | :type value: str, float, int |
||
| 1334 | :returns: The float number representation of the passed in value |
||
| 1335 | :rtype: float |
||
| 1336 | """ |
||
| 1337 | if not is_floatable(value): |
||
| 1338 | if default is not _marker: |
||
| 1339 | return to_float(default) |
||
| 1340 | fail("Value %s is not floatable" % repr(value)) |
||
| 1341 | return float(value) |
||
| 1342 | |||
| 1343 | |||
| 1344 | def to_searchable_text_metadata(value): |
||
| 1345 | """Parse the given metadata value to searchable text |
||
| 1346 | |||
| 1347 | :param value: The raw value of the metadata column |
||
| 1348 | :returns: Searchable and translated unicode value or None |
||
| 1349 | """ |
||
| 1350 | if not value: |
||
| 1351 | return u"" |
||
| 1352 | if value is Missing.Value: |
||
| 1353 | return u"" |
||
| 1354 | if is_uid(value): |
||
| 1355 | return u"" |
||
| 1356 | if isinstance(value, (bool)): |
||
| 1357 | return u"" |
||
| 1358 | if isinstance(value, (list, tuple)): |
||
| 1359 | for v in value: |
||
| 1360 | return to_searchable_text_metadata(v) |
||
| 1361 | if isinstance(value, (dict)): |
||
| 1362 | for k, v in value.items(): |
||
| 1363 | return to_searchable_text_metadata(v) |
||
| 1364 | if is_date(value): |
||
| 1365 | return value.strftime("%Y-%m-%d") |
||
| 1366 | if not isinstance(value, basestring): |
||
| 1367 | value = str(value) |
||
| 1368 | return safe_unicode(value) |
||
| 1369 |