Passed
Push — 2.x ( d59516...b5e2f5 )
by Jordi
06:37
created

bika.lims.utils.user_fullname()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 11
rs 9.95
c 0
b 0
f 0
cc 3
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import mimetypes
22
import os
23
import re
24
import tempfile
25
from email import Encoders
26
27
from email.MIMEBase import MIMEBase
28
from time import time
29
30
import six
31
from AccessControl import ModuleSecurityInfo
32
from AccessControl import allow_module
33
from AccessControl import getSecurityManager
34
from Acquisition import aq_inner
35
from Acquisition import aq_parent
36
from bika.lims import api
37
from bika.lims import logger
38
from bika.lims.browser import BrowserView
39
from bika.lims.interfaces import IClient
40
from bika.lims.interfaces import IClientAwareMixin
41
from DateTime import DateTime
42
from plone.protect.utils import addTokenToUrl
43
from plone.registry.interfaces import IRegistry
44
from plone.subrequest import subrequest
45
from Products.Archetypes.interfaces.field import IComputedField
46
from Products.Archetypes.public import DisplayList
47
from Products.CMFCore.utils import getToolByName
48
from Products.CMFCore.WorkflowCore import WorkflowException
49
from Products.CMFPlone.utils import safe_unicode
50
from Products.DCWorkflow.events import AfterTransitionEvent
51
from senaite.core.p3compat import cmp
52
from six.moves.urllib.request import urlopen
53
from weasyprint import CSS
54
from weasyprint import HTML
55
from weasyprint import default_url_fetcher
56
from zope.component import queryUtility
57
from zope.event import notify
58
from zope.i18n.locales import locales
59
60
ModuleSecurityInfo('email.Utils').declarePublic('formataddr')
61
allow_module('csv')
62
63
64
def to_utf8(text):
65
    if text is None:
66
        text = ''
67
    unicode_obj = safe_unicode(text)
68
    # If it receives a dictionary or list, it will not work
69
    if isinstance(unicode_obj, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable unicode does not seem to be defined.
Loading history...
70
        return unicode_obj.encode('utf-8')
71
    return unicode_obj
72
73
74
def to_unicode(text):
75
    if text is None:
76
        text = ''
77
    return safe_unicode(text)
78
79
80
def t(i18n_msg):
81
    """Safely translate and convert to UTF8, any zope i18n msgid returned from
82
    a bikaMessageFactory _
83
    """
84
    # cannot use bika.lims.deprecated (circular dependencies)
85
    import warnings
86
    warnings.simplefilter("always", DeprecationWarning)
87
    warn = "Deprecated: use senaite.core.i18n.translate instead"
88
    warnings.warn(warn, category=DeprecationWarning, stacklevel=2)
89
    warnings.simplefilter("default", DeprecationWarning)
90
91
    # prevent circular dependencies
92
    from senaite.core.i18n import translate
93
    return translate(i18n_msg)
94
95
96
# Wrapper for PortalTransport's sendmail - don't know why there sendmail
97
# method is marked private
98
ModuleSecurityInfo('Products.bika.utils').declarePublic('sendmail')
99
# Protected( Publish, 'sendmail')
100
101
102
def sendmail(portal, from_addr, to_addrs, msg):
103
    mailspool = portal.portal_mailspool
104
    mailspool.sendmail(from_addr, to_addrs, msg)
105
106
107
class js_log(BrowserView):
108
109
    def __call__(self, message):
110
        """Javascript sends a string for us to place into the log.
111
        """
112
        self.logger.info(message)
113
114
115
class js_err(BrowserView):
116
117
    def __call__(self, message):
118
        """Javascript sends a string for us to place into the error log
119
        """
120
        self.logger.error(message)
121
122
123
class js_warn(BrowserView):
124
125
    def __call__(self, message):
126
        """Javascript sends a string for us to place into the warn log
127
        """
128
        self.logger.warning(message)
129
130
131
def getUsers(context, roles, allow_empty=True):
132
    """ Present a DisplayList containing users in the specified
133
        list of roles
134
    """
135
    mtool = getToolByName(context, 'portal_membership')
136
    pairs = allow_empty and [['', '']] or []
137
    users = mtool.searchForMembers(roles=roles)
138
    for user in users:
139
        uid = user.getId()
140
        fullname = user.getProperty('fullname')
141
        if not fullname:
142
            fullname = uid
143
        pairs.append((uid, fullname))
144
    pairs.sort(lambda x, y: cmp(x[1], y[1]))
145
    return DisplayList(pairs)
146
147
148
def formatDateQuery(context, date_id):
149
    """ Obtain and reformat the from and to dates
150
        into a date query construct
151
    """
152
    from_date = context.REQUEST.get('%s_fromdate' % date_id, None)
153
    if from_date:
154
        from_date = from_date + ' 00:00'
155
    to_date = context.REQUEST.get('%s_todate' % date_id, None)
156
    if to_date:
157
        to_date = to_date + ' 23:59'
158
159
    date_query = {}
160
    if from_date and to_date:
161
        date_query = {'query': [from_date, to_date],
162
                      'range': 'min:max'}
163
    elif from_date or to_date:
164
        date_query = {'query': from_date or to_date,
165
                      'range': from_date and 'min' or 'max'}
166
167
    return date_query
168
169
170
def formatDateParms(context, date_id):
171
    """ Obtain and reformat the from and to dates
172
        into a printable date parameter construct
173
    """
174
    from_date = context.REQUEST.get('%s_fromdate' % date_id, None)
175
    to_date = context.REQUEST.get('%s_todate' % date_id, None)
176
177
    date_parms = {}
178
    if from_date and to_date:
179
        date_parms = 'from %s to %s' % (from_date, to_date)
180
    elif from_date:
181
        date_parms = 'from %s' % (from_date)
182
    elif to_date:
183
        date_parms = 'to %s' % (to_date)
184
185
    return date_parms
186
187
188
def formatDecimalMark(value, decimalmark='.'):
189
    """
190
        Dummy method to replace decimal mark from an input string.
191
        Assumes that 'value' uses '.' as decimal mark and ',' as
192
        thousand mark.
193
        ::value:: is a string
194
        ::returns:: is a string with the decimal mark if needed
195
    """
196
    # We have to consider the possibility of working with decimals such as
197
    # X.000 where those decimals are important because of the precission
198
    # and significant digits matters
199
    # Using 'float' the system delete the extre desimals with 0 as a value
200
    # Example: float(2.00) -> 2.0
201
    # So we have to save the decimal length, this is one reason we are usnig
202
    # strings for results
203
    rawval = str(value)
204
    try:
205
        return decimalmark.join(rawval.split('.'))
206
    except Exception:
207
        return rawval
208
209
210
# encode_header function copied from roundup's rfc2822 package.
211
hqre = re.compile(r'^[A-z0-9!"#$%%&\'()*+,-./:;<=>?@\[\]^_`{|}~ ]+$')
212
213
ModuleSecurityInfo('Products.bika.utils').declarePublic('encode_header')
214
215
216
def encode_header(header, charset='utf-8'):
217
    """ Will encode in quoted-printable encoding only if header
218
    contains non latin characters
219
    """
220
221
    # Return empty headers unchanged
222
    if not header:
223
        return header
224
225
    # return plain header if it does not contain non-ascii characters
226
    if hqre.match(header):
227
        return header
228
229
    quoted = ''
230
    # max_encoded = 76 - len(charset) - 7
231
    for c in header:
232
        # Space may be represented as _ instead of =20 for readability
233
        if c == ' ':
234
            quoted += '_'
235
        # These characters can be included verbatim
236
        elif hqre.match(c):
237
            quoted += c
238
        # Otherwise, replace with hex value like =E2
239
        else:
240
            quoted += "=%02X" % ord(c)
241
242
    return '=?%s?q?%s?=' % (charset, quoted)
243
244
245
def zero_fill(matchobj):
246
    return matchobj.group().zfill(8)
247
248
249
num_sort_regex = re.compile(r'\d+')
250
251
ModuleSecurityInfo('Products.bika.utils').declarePublic('sortable_title')
252
253
254
def sortable_title(portal, title):
255
    """Convert title to sortable title
256
    """
257
    if not title:
258
        return ''
259
260
    def_charset = portal.plone_utils.getSiteEncoding()
261
    sortabletitle = str(title.lower().strip())
262
    # Replace numbers with zero filled numbers
263
    sortabletitle = num_sort_regex.sub(zero_fill, sortabletitle)
264
    # Truncate to prevent bloat
265
    for charset in [def_charset, 'latin-1', 'utf-8']:
266
        try:
267
            sortabletitle = safe_unicode(sortabletitle, charset)[:30]
268
            sortabletitle = sortabletitle.encode(def_charset or 'utf-8')
269
            break
270
        except UnicodeError:
271
            pass
272
        except TypeError:
273
            # If we get a TypeError if we already have a unicode string
274
            sortabletitle = sortabletitle[:30]
275
            break
276
    return sortabletitle
277
278
# TODO Remove this function
279
def logged_in_client(context, member=None):
280
    return api.get_current_client()
281
282
def changeWorkflowState(content, wf_id, state_id, **kw):
283
    """Change the workflow state of an object
284
    @param content: Content obj which state will be changed
285
    @param state_id: name of the state to put on content
286
    @param kw: change the values of same name of the state mapping
287
    @return: True if succeed. Otherwise, False
288
    """
289
    portal_workflow = api.get_tool("portal_workflow")
290
    workflow = portal_workflow.getWorkflowById(wf_id)
291
    if not workflow:
292
        logger.error("%s: Cannot find workflow id %s" % (content, wf_id))
293
        return False
294
295
    wf_state = {
296
        'action': kw.get("action", None),
297
        'actor': kw.get("actor", api.get_current_user().id),
298
        'comments': "Setting state to %s" % state_id,
299
        'review_state': state_id,
300
        'time': DateTime()
301
    }
302
303
    # Get old and new state info
304
    old_state = workflow._getWorkflowStateOf(content)
305
    new_state = workflow.states.get(state_id, None)
306
    if new_state is None:
307
        raise WorkflowException("Destination state undefined: {}"
308
                                .format(state_id))
309
310
    # Change status and update permissions
311
    portal_workflow.setStatusOf(wf_id, content, wf_state)
312
    workflow.updateRoleMappingsFor(content)
313
314
    # Notify the object has been transitioned
315
    notify(AfterTransitionEvent(content, workflow, old_state, new_state, None,
316
                                wf_state, None))
317
318
    # Map changes to catalog
319
    indexes = ["allowedRolesAndUsers", "review_state", "is_active"]
320
    content.reindexObject(idxs=indexes)
321
    return True
322
323
324
def tmpID():
325
    import binascii
326
    return binascii.hexlify(os.urandom(16))
327
328
329
def isnumber(s):
330
    return api.is_floatable(s)
331
332
333
def senaite_url_fetcher(url):
334
    """Uses plone.subrequest to fetch an internal image resource.
335
336
    If the URL points to an external resource, the URL is handed
337
    to weasyprint.default_url_fetcher.
338
339
    Please see these links for details:
340
341
        - https://github.com/plone/plone.subrequest
342
        - https://pypi.python.org/pypi/plone.subrequest
343
        - https://github.com/senaite/senaite.core/issues/538
344
345
    :returns: A dict with the following keys:
346
347
        * One of ``string`` (a byte string) or ``file_obj``
348
          (a file-like object)
349
        * Optionally: ``mime_type``, a MIME type extracted e.g. from a
350
          *Content-Type* header. If not provided, the type is guessed from the
351
          file extension in the URL.
352
        * Optionally: ``encoding``, a character encoding extracted e.g. from a
353
          *charset* parameter in a *Content-Type* header
354
        * Optionally: ``redirected_url``, the actual URL of the resource
355
          if there were e.g. HTTP redirects.
356
        * Optionally: ``filename``, the filename of the resource. Usually
357
          derived from the *filename* parameter in a *Content-Disposition*
358
          header
359
360
        If a ``file_obj`` key is given, it is the caller’s responsibility
361
        to call ``file_obj.close()``.
362
    """
363
364
    logger.info("Fetching URL '{}' for WeasyPrint".format(url))
365
366
    # get the pyhsical path from the URL
367
    request = api.get_request()
368
    host = request.get_header("HOST")
369
    path = "/".join(request.physicalPathFromURL(url))
370
371
    # fetch the object by sub-request
372
    portal = api.get_portal()
373
    context = portal.restrictedTraverse(path, None)
374
375
    # We double check here to avoid an edge case, where we have the same path
376
    # as well in our local site, e.g. we have `/senaite/img/systems/senaite.png`,
377
    # but the user requested http://www.ridingbytes.com/img/systems/senaite.png:
378
    #
379
    # "/".join(request.physicalPathFromURL("http://www.ridingbytes.com/img/systems/senaite.png"))
380
    # '/senaite/img/systems/senaite.png'
381
    if context is None or host not in url:
382
        logger.info("URL is external, passing over to the default URL fetcher...")
383
        return default_url_fetcher(url)
384
385
    logger.info("URL is local, fetching data by path '{}' via subrequest".format(path))
386
387
    # get the data via an authenticated subrequest
388
    response = subrequest(path)
389
390
    # Prepare the return data as required by WeasyPrint
391
    string = response.getBody()
392
    filename = url.split("/")[-1]
393
    mime_type = mimetypes.guess_type(url)[0]
394
    redirected_url = url
395
396
    return {
397
        "string": string,
398
        "filename": filename,
399
        "mime_type": mime_type,
400
        "redirected_url": redirected_url,
401
    }
402
403
404
def createPdf(htmlreport, outfile=None, css=None, images={}):
405
    """create a PDF from some HTML.
406
    htmlreport: rendered html
407
    outfile: pdf filename; if supplied, caller is responsible for creating
408
             and removing it.
409
    css: remote URL of css file to download
410
    images: A dictionary containing possible URLs (keys) and local filenames
411
            (values) with which they may to be replaced during rendering.
412
    # WeasyPrint will attempt to retrieve images directly from the URL
413
    # referenced in the HTML report, which may refer back to a single-threaded
414
    # (and currently occupied) zeoclient, hanging it.  All image source
415
    # URL's referenced in htmlreport should be local files.
416
    """
417
    # A list of files that should be removed after PDF is written
418
    cleanup = []
419
    css_def = ''
420
    if css:
421
        if css.startswith("http://") or css.startswith("https://"):
422
            # Download css file in temp dir
423
            u = urlopen(css)
424
            _cssfile = tempfile.mktemp(suffix='.css')
425
            localFile = open(_cssfile, 'w')
426
            localFile.write(u.read())
427
            localFile.close()
428
            cleanup.append(_cssfile)
429
        else:
430
            _cssfile = css
431
        cssfile = open(_cssfile, 'r')
432
        css_def = cssfile.read()
433
434
    htmlreport = to_utf8(htmlreport)
435
436
    for (key, val) in images.items():
437
        htmlreport = htmlreport.replace(key, val)
438
439
    # render
440
    htmlreport = to_utf8(htmlreport)
441
    renderer = HTML(string=htmlreport, url_fetcher=senaite_url_fetcher, encoding='utf-8')
442
    pdf_fn = outfile if outfile else tempfile.mktemp(suffix=".pdf")
443
    if css:
444
        renderer.write_pdf(pdf_fn, stylesheets=[CSS(string=css_def)])
445
    else:
446
        renderer.write_pdf(pdf_fn)
447
    # return file data
448
    pdf_data = open(pdf_fn, "rb").read()
449
    if outfile is None:
450
        os.remove(pdf_fn)
451
    for fn in cleanup:
452
        os.remove(fn)
453
    return pdf_data
454
455
456
def attachPdf(mimemultipart, pdfreport, filename=None):
457
    part = MIMEBase('application', "pdf")
458
    part.add_header('Content-Disposition',
459
                    'attachment; filename="%s.pdf"' % (filename or tmpID()))
460
    part.set_payload(pdfreport)
461
    Encoders.encode_base64(part)
462
    mimemultipart.attach(part)
463
464
465
def currency_format(context, locale):
466
    locale = locales.getLocale(locale)
467
    currency = context.bika_setup.getCurrency()
468
    symbol = locale.numbers.currencies[currency].symbol
469
470
    def format(val):
471
        return '%s %0.2f' % (symbol, val)
472
    return format
473
474
475
def getHiddenAttributesForClass(classname):
476
    try:
477
        registry = queryUtility(IRegistry)
478
        hiddenattributes = registry.get('bika.lims.hiddenattributes', ())
479
        if hiddenattributes is not None:
480
            for alist in hiddenattributes:
481
                if alist[0] == classname:
482
                    return alist[1:]
483
    except Exception:
484
        logger.warning(
485
            'Probem accessing optionally hidden attributes in registry')
486
487
    return []
488
489
490
def dicts_to_dict(dictionaries, key_subfieldname):
491
    """Convert a list of dictionaries into a dictionary of dictionaries.
492
493
    key_subfieldname must exist in each Record's subfields and have a value,
494
    which will be used as the key for the new dictionary. If a key is duplicated,
495
    the earlier value will be overwritten.
496
    """
497
    result = {}
498
    for d in dictionaries:
499
        result[d[key_subfieldname]] = d
500
    return result
501
502
503
def format_supsub(text):
504
    """
505
    Mainly used for Analysis Service's unit. Transform the text adding
506
    sub and super html scripts:
507
    For super-scripts, use ^ char
508
    For sub-scripts, use _ char
509
    The expression "cm^2" will be translated to "cm²" and the
510
    expression "b_(n-1)" will be translated to "b n-1".
511
    The expression "n_(fibras)/cm^3" will be translated as
512
    "n fibras / cm³"
513
    :param text: text to be formatted
514
    """
515
    out = []
516
    subsup = []
517
    clauses = []
518
    insubsup = True
519
    for c in str(text).strip():
520
        if c == '(':
521
            if insubsup is False:
522
                out.append(c)
523
                clauses.append(')')
524
            else:
525
                clauses.append('')
526
527
        elif c == ')':
528
            if len(clauses) > 0:
529
                out.append(clauses.pop())
530
                if len(subsup) > 0:
531
                    out.append(subsup.pop())
532
533
        elif c == '^':
534
            subsup.append('</sup>')
535
            out.append('<sup>')
536
            insubsup = True
537
            continue
538
539
        elif c == '_':
540
            subsup.append('</sub>')
541
            out.append('<sub>')
542
            insubsup = True
543
            continue
544
545
        elif c == ' ':
546
            if insubsup is True:
547
                out.append(subsup.pop())
548
            else:
549
                out.append(c)
550
        elif c in ['+', '-']:
551
            if len(clauses) == 0 and len(subsup) > 0:
552
                out.append(subsup.pop())
553
            out.append(c)
554
        else:
555
            out.append(c)
556
557
        insubsup = False
558
559
    while True:
560
        if len(subsup) == 0:
561
            break
562
        out.append(subsup.pop())
563
564
    return ''.join(out)
565
566
567
def checkPermissions(permissions=[], obj=None):
568
    """
569
    Checks if a user has permissions for a given object.
570
571
    Args:
572
        permissions: The permissions the current user must be compliant with
573
        obj: The object for which the permissions apply
574
575
    Returns:
576
        1 if the user complies with all the permissions for the given object.
577
        Otherwise, it returns empty.
578
    """
579
    if not obj:
580
        return False
581
    sm = getSecurityManager()
582
    for perm in permissions:
583
        if not sm.checkPermission(perm, obj):
584
            return ''
585
    return True
586
587
588
def getFromString(obj, string, default=None):
589
    attr_obj = obj
590
    attrs = string.split('.')
591
    for attr in attrs:
592
        attr_obj = api.safe_getattr(attr_obj, attr, default=None)
593
        if not attr_obj:
594
            break
595
    return attr_obj or default
596
597
598
def measure_time(func_to_measure):
599
    """
600
    This decorator allows to measure the execution time
601
    of a function and prints it to the console.
602
    :param func_to_measure: function to be decorated
603
    """
604
    def wrap(*args, **kwargs):
605
        start_time = time()
606
        return_value = func_to_measure(*args, **kwargs)
607
        finish_time = time()
608
        log = "%s took %0.4f seconds. start_time = %0.4f - finish_time = %0.4f\n" % (func_to_measure.func_name,
609
                                                                                     finish_time - start_time,
610
                                                                                     start_time,
611
                                                                                     finish_time)
612
        print(log)
613
        return return_value
614
    return wrap
615
616
617
def copy_field_values(src, dst, ignore_fieldnames=None, ignore_fieldtypes=None):
618
    ignore_fields = ignore_fieldnames if ignore_fieldnames else []
619
    ignore_types = ignore_fieldtypes if ignore_fieldtypes else []
620
    if 'id' not in ignore_fields:
621
        ignore_fields.append('id')
622
623
    src_schema = src.Schema()
624
    dst_schema = dst.Schema()
625
626
    for field in src_schema.fields():
627
        if IComputedField.providedBy(field):
628
            continue
629
        fieldname = field.getName()
630
        if fieldname in ignore_fields \
631
                or field.type in ignore_types \
632
                or fieldname not in dst_schema:
633
            continue
634
        value = field.get(src)
635
        if value:
636
            dst_schema[fieldname].set(dst, value)
637
638
639
def get_link(href, value=None, csrf=True, **kwargs):
640
    """
641
    Returns a well-formed link. If href is None/empty, returns an empty string
642
    :param href: value to be set for attribute href
643
    :param value: the text to be displayed. If None, the href itself is used
644
    :param value: if True, the CSRF token is added in the href
645
    :param kwargs: additional attributes and values
646
    :return: a well-formed html anchor
647
    """
648
    if not href:
649
        return ""
650
    anchor_value = value and value or href
651
    attr = render_html_attributes(**kwargs)
652
    # Add a CSRF token
653
    if csrf and href.startswith("http"):
654
        href = addTokenToUrl(href)
655
    return '<a href="{}" {}>{}</a>'.format(href, attr, anchor_value)
656
657
658
def get_link_for(obj, **kwargs):
659
    """Returns a well-formed html anchor to the object
660
    """
661
    if not obj:
662
        return ""
663
    href = api.get_url(obj)
664
    value = api.get_title(obj)
665
    return get_link(href=href, value=value, **kwargs)
666
667
668
def get_email_link(email, value=None):
669
    """
670
    Returns a well-formed link to an email address. If email is None/empty,
671
    returns an empty string
672
    :param email: email address
673
    :param link_text: text to be displayed. If None, the email itself is used
674
    :return: a well-formatted html anchor
675
    """
676
    if not email:
677
        return ""
678
    mailto = 'mailto:{}'.format(email)
679
    link_value = value and value or email
680
    return get_link(mailto, link_value)
681
682
683
def get_image(name, **kwargs):
684
    """Returns a well-formed image
685
    :param name: file name of the image
686
    :param kwargs: additional attributes and values
687
    :return: a well-formed html img
688
    """
689
    if not name:
690
        return ""
691
    portal = api.get_portal()
692
    theme = portal.restrictedTraverse("@@senaite_theme")
693
    basename, ext = os.path.splitext(name)
694
    if basename in theme.icons():
695
        if "width" not in kwargs:
696
            kwargs["width"] = "16"
697
        return theme.icon_tag(basename, **kwargs)
698
    portal_url = api.get_url(portal)
699
    attr = render_html_attributes(**kwargs)
700
    html = '<img src="{}/++resource++bika.lims.images/{}" {}/>'
701
    return html.format(portal_url, name, attr)
702
703
704
def get_progress_bar_html(percentage):
705
    """Returns an html that represents a progress bar
706
    """
707
    return '<div class="progress md-progress">' \
708
           '<div class="progress-bar" style="width: {0}%">{0}%</div>' \
709
           '</div>'.format(percentage or 0)
710
711
712
def render_html_attributes(**kwargs):
713
    """Returns a string representation of attributes for html entities
714
    :param kwargs: attributes and values
715
    :return: a well-formed string representation of attributes"""
716
    attr = list()
717
    if kwargs:
718
        attr = ['{}="{}"'.format(key, val) for key, val in kwargs.items()]
719
    return " ".join(attr).replace("css_class", "class")
720
721
722
def get_registry_value(key, default=None):
723
    """
724
    Gets the utility for IRegistry and returns the value for the key passed in.
725
    If there is no value for the key passed in, returns default value
726
    :param key: the key in the registry to look for
727
    :param default: default value if the key is not registered
728
    :return: value in the registry for the key passed in
729
    """
730
    # cannot use bika.lims.deprecated (circular dependencies)
731
    import warnings
732
    warnings.simplefilter("always", DeprecationWarning)
733
    warn = "Deprecated: use senaite.core.api.get_registry_record instead"
734
    warnings.warn(warn, category=DeprecationWarning, stacklevel=2)
735
    warnings.simplefilter("default", DeprecationWarning)
736
    return api.get_registry_record(key, default=default)
737
738
739
def check_permission(permission, obj):
740
    """
741
    Returns if the current user has rights for the permission passed in against
742
    the obj passed in
743
    :param permission: name of the permission
744
    :param obj: the object to check the permission against for the current user
745
    :return: 1 if the user has rights for this permission for the passed in obj
746
    """
747
    mtool = api.get_tool('portal_membership')
748
    object = api.get_object(obj)
749
    return mtool.checkPermission(permission, object)
750
751
752
def to_int(value, default=0):
753
    """
754
    Tries to convert the value passed in as an int. If no success, returns the
755
    default value passed in
756
    :param value: the string to convert to integer
757
    :param default: the default fallback
758
    :return: int representation of the value passed in
759
    """
760
    try:
761
        return int(value)
762
    except (TypeError, ValueError):
763
        return to_int(default, default=0)
764
765
766
def get_strings(data):
767
    """
768
    Convert unicode values to strings even if they belong to lists or dicts.
769
    :param data: an object.
770
    :return: The object with all unicode values converted to string.
771
    """
772
    # if this is a unicode string, return its string representation
773
    if isinstance(data, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable unicode does not seem to be defined.
Loading history...
774
        return data.encode('utf-8')
775
776
    # if this is a list of values, return list of string values
777
    if isinstance(data, list):
778
        return map(get_strings, data)
779
780
    # if this is a dictionary, return dictionary of string keys and values
781
    if isinstance(data, dict):
782
783
        return {
784
            get_strings(key): get_strings(value)
785
            for key, value in six.iteritems(data)
786
        }
787
    # if it's anything else, return it in its original form
788
    return data
789
790
791
def get_unicode(data):
792
    """
793
    Convert string values to unicode even if they belong to lists or dicts.
794
    :param data: an object.
795
    :return: The object with all string values converted to unicode.
796
    """
797
    # if this is a common string, return its unicode representation
798
    if isinstance(data, str):
799
        return safe_unicode(data)
800
801
    # if this is a list of values, return list of unicode values
802
    if isinstance(data, list):
803
        return [get_unicode(item) for item in data]
804
805
    # if this is a dictionary, return dictionary of unicode keys and values
806
    if isinstance(data, dict):
807
        return {
808
            get_unicode(key): get_unicode(value)
809
            for key, value in six.iteritems(data)
810
        }
811
    # if it's anything else, return it in its original form
812
    return data
813
814
815
def is_bika_installed():
816
    """Check if Bika LIMS is installed in the Portal
817
    """
818
    qi = api.portal.get_tool("portal_quickinstaller")
819
    return qi.isProductInstalled("bika.lims")
820
821
822
def get_display_list(brains_or_objects=None, none_item=False):
823
    """
824
    Returns a DisplayList with the items sorted by Title
825
    :param brains_or_objects: list of brains or objects
826
    :param none_item: adds an item with empty uid and text "Select.." in pos 0
827
    :return: DisplayList (uid, title) sorted by title ascending
828
    :rtype: DisplayList
829
    """
830
    if brains_or_objects is None:
831
        return get_display_list(list(), none_item)
832
833
    items = list()
834
    for brain in brains_or_objects:
835
        uid = api.get_uid(brain)
836
        if not uid:
837
            continue
838
        title = api.get_title(brain)
839
        items.append((uid, title))
840
841
    # Sort items by title ascending
842
    items.sort(lambda x, y: cmp(x[1], y[1]))
843
844
    # Add the first item?
845
    if none_item:
846
        items.insert(0, ('', t('Select...')))
847
848
    return DisplayList(items)
849
850
851
def to_choices(display_list):
852
    """Converts a display list to a choices list
853
    """
854
    if not display_list:
855
        return []
856
857
    return map(
858
        lambda item: {
859
            "ResultValue": item[0],
860
            "ResultText": item[1]},
861
        display_list.items())
862
863
864
def chain(obj):
865
    """Generator to walk the acquistion chain of object, considering that it
866
    could be a function.
867
868
    If the thing we are accessing is actually a bound method on an instance,
869
    then after we've checked the method itself, get the instance it's bound to
870
    using im_self, so that we can continue to walk up the acquistion chain from
871
    it (incidentally, this is why we can't juse use aq_chain()).
872
    """
873
    context = aq_inner(obj)
874
875
    while context is not None:
876
        yield context
877
878
        func_object = getattr(context, "im_self", None)
879
        if func_object is not None:
880
            context = aq_inner(func_object)
881
        else:
882
            # Don't use aq_inner() since portal_factory (and probably other)
883
            # things, depends on being able to wrap itself in a fake context.
884
            context = aq_parent(context)
885
886
887
def get_client(obj):
888
    """Returns the client the object passed-in belongs to, if any
889
890
    This walks the acquisition chain up until we find something which provides
891
    either IClient or IClientAwareMixin
892
    """
893
    for obj in chain(obj):
894
        if IClient.providedBy(obj):
895
            return obj
896
        elif IClientAwareMixin.providedBy(obj):
897
            # ClientAwareMixin can return a Client, even if there is no client
898
            # in the acquisition chain
899
            return obj.getClient()
900
901
    return None
902