Passed
Push — 2.x ( 6101d0...7bb4b8 )
by Jordi
05:10 queued 37s
created

bika.lims.utils.getFromString()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 3
nop 3
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import mimetypes
22
import os
23
import re
24
import six
25
import tempfile
26
from email import Encoders
27
from email.MIMEBase import MIMEBase
28
from six.moves.urllib.request import urlopen
29
from time import time
30
31
from AccessControl import ModuleSecurityInfo
32
from AccessControl import allow_module
33
from AccessControl import getSecurityManager
34
from Acquisition import aq_inner
35
from Acquisition import aq_parent
36
from bika.lims import api
37
from bika.lims import logger
38
from bika.lims.browser import BrowserView
39
from bika.lims.interfaces import IClient
40
from bika.lims.interfaces import IClientAwareMixin
41
from DateTime import DateTime
42
from plone.protect.utils import addTokenToUrl
43
from plone.registry.interfaces import IRegistry
44
from plone.subrequest import subrequest
45
from Products.Archetypes.interfaces.field import IComputedField
46
from Products.Archetypes.public import DisplayList
47
from Products.CMFCore.utils import getToolByName
48
from Products.CMFCore.WorkflowCore import WorkflowException
49
from Products.CMFPlone.utils import safe_unicode
50
from Products.DCWorkflow.events import AfterTransitionEvent
51
from senaite.core.p3compat import cmp
52
from weasyprint import CSS
53
from weasyprint import HTML
54
from weasyprint import default_url_fetcher
55
from zope.component import queryUtility
56
from zope.event import notify
57
from zope.i18n import translate
58
from zope.i18n.locales import locales
59
60
ModuleSecurityInfo('email.Utils').declarePublic('formataddr')
61
allow_module('csv')
62
63
64
def to_utf8(text):
65
    if text is None:
66
        text = ''
67
    unicode_obj = safe_unicode(text)
68
    # If it receives a dictionary or list, it will not work
69
    if isinstance(unicode_obj, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable unicode does not seem to be defined.
Loading history...
70
        return unicode_obj.encode('utf-8')
71
    return unicode_obj
72
73
74
def to_unicode(text):
75
    if text is None:
76
        text = ''
77
    return safe_unicode(text)
78
79
80
def t(i18n_msg):
81
    """Safely translate and convert to UTF8, any zope i18n msgid returned from
82
    a bikaMessageFactory _
83
    """
84
    text = to_unicode(i18n_msg)
85
    try:
86
        request = api.get_request()
87
        domain = getattr(i18n_msg, "domain", "senaite.core")
88
        text = translate(text, domain=domain, context=request)
89
    except UnicodeDecodeError:
90
        # TODO: This is only a quick fix
91
        logger.warn("{} couldn't be translated".format(text))
92
    return to_utf8(text)
93
94
95
# Wrapper for PortalTransport's sendmail - don't know why there sendmail
96
# method is marked private
97
ModuleSecurityInfo('Products.bika.utils').declarePublic('sendmail')
98
# Protected( Publish, 'sendmail')
99
100
101
def sendmail(portal, from_addr, to_addrs, msg):
102
    mailspool = portal.portal_mailspool
103
    mailspool.sendmail(from_addr, to_addrs, msg)
104
105
106
class js_log(BrowserView):
107
108
    def __call__(self, message):
109
        """Javascript sends a string for us to place into the log.
110
        """
111
        self.logger.info(message)
112
113
114
class js_err(BrowserView):
115
116
    def __call__(self, message):
117
        """Javascript sends a string for us to place into the error log
118
        """
119
        self.logger.error(message)
120
121
122
class js_warn(BrowserView):
123
124
    def __call__(self, message):
125
        """Javascript sends a string for us to place into the warn log
126
        """
127
        self.logger.warning(message)
128
129
130
def getUsers(context, roles, allow_empty=True):
131
    """ Present a DisplayList containing users in the specified
132
        list of roles
133
    """
134
    mtool = getToolByName(context, 'portal_membership')
135
    pairs = allow_empty and [['', '']] or []
136
    users = mtool.searchForMembers(roles=roles)
137
    for user in users:
138
        uid = user.getId()
139
        fullname = user.getProperty('fullname')
140
        if not fullname:
141
            fullname = uid
142
        pairs.append((uid, fullname))
143
    pairs.sort(lambda x, y: cmp(x[1], y[1]))
144
    return DisplayList(pairs)
145
146
147
def formatDateQuery(context, date_id):
148
    """ Obtain and reformat the from and to dates
149
        into a date query construct
150
    """
151
    from_date = context.REQUEST.get('%s_fromdate' % date_id, None)
152
    if from_date:
153
        from_date = from_date + ' 00:00'
154
    to_date = context.REQUEST.get('%s_todate' % date_id, None)
155
    if to_date:
156
        to_date = to_date + ' 23:59'
157
158
    date_query = {}
159
    if from_date and to_date:
160
        date_query = {'query': [from_date, to_date],
161
                      'range': 'min:max'}
162
    elif from_date or to_date:
163
        date_query = {'query': from_date or to_date,
164
                      'range': from_date and 'min' or 'max'}
165
166
    return date_query
167
168
169
def formatDateParms(context, date_id):
170
    """ Obtain and reformat the from and to dates
171
        into a printable date parameter construct
172
    """
173
    from_date = context.REQUEST.get('%s_fromdate' % date_id, None)
174
    to_date = context.REQUEST.get('%s_todate' % date_id, None)
175
176
    date_parms = {}
177
    if from_date and to_date:
178
        date_parms = 'from %s to %s' % (from_date, to_date)
179
    elif from_date:
180
        date_parms = 'from %s' % (from_date)
181
    elif to_date:
182
        date_parms = 'to %s' % (to_date)
183
184
    return date_parms
185
186
187
def formatDecimalMark(value, decimalmark='.'):
188
    """
189
        Dummy method to replace decimal mark from an input string.
190
        Assumes that 'value' uses '.' as decimal mark and ',' as
191
        thousand mark.
192
        ::value:: is a string
193
        ::returns:: is a string with the decimal mark if needed
194
    """
195
    # We have to consider the possibility of working with decimals such as
196
    # X.000 where those decimals are important because of the precission
197
    # and significant digits matters
198
    # Using 'float' the system delete the extre desimals with 0 as a value
199
    # Example: float(2.00) -> 2.0
200
    # So we have to save the decimal length, this is one reason we are usnig
201
    # strings for results
202
    rawval = str(value)
203
    try:
204
        return decimalmark.join(rawval.split('.'))
205
    except Exception:
206
        return rawval
207
208
209
# encode_header function copied from roundup's rfc2822 package.
210
hqre = re.compile(r'^[A-z0-9!"#$%%&\'()*+,-./:;<=>?@\[\]^_`{|}~ ]+$')
211
212
ModuleSecurityInfo('Products.bika.utils').declarePublic('encode_header')
213
214
215
def encode_header(header, charset='utf-8'):
216
    """ Will encode in quoted-printable encoding only if header
217
    contains non latin characters
218
    """
219
220
    # Return empty headers unchanged
221
    if not header:
222
        return header
223
224
    # return plain header if it does not contain non-ascii characters
225
    if hqre.match(header):
226
        return header
227
228
    quoted = ''
229
    # max_encoded = 76 - len(charset) - 7
230
    for c in header:
231
        # Space may be represented as _ instead of =20 for readability
232
        if c == ' ':
233
            quoted += '_'
234
        # These characters can be included verbatim
235
        elif hqre.match(c):
236
            quoted += c
237
        # Otherwise, replace with hex value like =E2
238
        else:
239
            quoted += "=%02X" % ord(c)
240
241
    return '=?%s?q?%s?=' % (charset, quoted)
242
243
244
def zero_fill(matchobj):
245
    return matchobj.group().zfill(8)
246
247
248
num_sort_regex = re.compile('\d+')
249
250
ModuleSecurityInfo('Products.bika.utils').declarePublic('sortable_title')
251
252
253
def sortable_title(portal, title):
254
    """Convert title to sortable title
255
    """
256
    if not title:
257
        return ''
258
259
    def_charset = portal.plone_utils.getSiteEncoding()
260
    sortabletitle = str(title.lower().strip())
261
    # Replace numbers with zero filled numbers
262
    sortabletitle = num_sort_regex.sub(zero_fill, sortabletitle)
263
    # Truncate to prevent bloat
264
    for charset in [def_charset, 'latin-1', 'utf-8']:
265
        try:
266
            sortabletitle = safe_unicode(sortabletitle, charset)[:30]
267
            sortabletitle = sortabletitle.encode(def_charset or 'utf-8')
268
            break
269
        except UnicodeError:
270
            pass
271
        except TypeError:
272
            # If we get a TypeError if we already have a unicode string
273
            sortabletitle = sortabletitle[:30]
274
            break
275
    return sortabletitle
276
277
# TODO Remove this function
278
def logged_in_client(context, member=None):
279
    return api.get_current_client()
280
281
def changeWorkflowState(content, wf_id, state_id, **kw):
282
    """Change the workflow state of an object
283
    @param content: Content obj which state will be changed
284
    @param state_id: name of the state to put on content
285
    @param kw: change the values of same name of the state mapping
286
    @return: True if succeed. Otherwise, False
287
    """
288
    portal_workflow = api.get_tool("portal_workflow")
289
    workflow = portal_workflow.getWorkflowById(wf_id)
290
    if not workflow:
291
        logger.error("%s: Cannot find workflow id %s" % (content, wf_id))
292
        return False
293
294
    wf_state = {
295
        'action': kw.get("action", None),
296
        'actor': kw.get("actor", api.get_current_user().id),
297
        'comments': "Setting state to %s" % state_id,
298
        'review_state': state_id,
299
        'time': DateTime()
300
    }
301
302
    # Get old and new state info
303
    old_state = workflow._getWorkflowStateOf(content)
304
    new_state = workflow.states.get(state_id, None)
305
    if new_state is None:
306
        raise WorkflowException("Destination state undefined: {}"
307
                                .format(state_id))
308
309
    # Change status and update permissions
310
    portal_workflow.setStatusOf(wf_id, content, wf_state)
311
    workflow.updateRoleMappingsFor(content)
312
313
    # Notify the object has been transitioned
314
    notify(AfterTransitionEvent(content, workflow, old_state, new_state, None,
315
                                wf_state, None))
316
317
    # Map changes to catalog
318
    indexes = ["allowedRolesAndUsers", "review_state", "is_active"]
319
    content.reindexObject(idxs=indexes)
320
    return True
321
322
323
def tmpID():
324
    import binascii
325
    return binascii.hexlify(os.urandom(16))
326
327
328
def isnumber(s):
329
    return api.is_floatable(s)
330
331
332
def senaite_url_fetcher(url):
333
    """Uses plone.subrequest to fetch an internal image resource.
334
335
    If the URL points to an external resource, the URL is handed
336
    to weasyprint.default_url_fetcher.
337
338
    Please see these links for details:
339
340
        - https://github.com/plone/plone.subrequest
341
        - https://pypi.python.org/pypi/plone.subrequest
342
        - https://github.com/senaite/senaite.core/issues/538
343
344
    :returns: A dict with the following keys:
345
346
        * One of ``string`` (a byte string) or ``file_obj``
347
          (a file-like object)
348
        * Optionally: ``mime_type``, a MIME type extracted e.g. from a
349
          *Content-Type* header. If not provided, the type is guessed from the
350
          file extension in the URL.
351
        * Optionally: ``encoding``, a character encoding extracted e.g. from a
352
          *charset* parameter in a *Content-Type* header
353
        * Optionally: ``redirected_url``, the actual URL of the resource
354
          if there were e.g. HTTP redirects.
355
        * Optionally: ``filename``, the filename of the resource. Usually
356
          derived from the *filename* parameter in a *Content-Disposition*
357
          header
358
359
        If a ``file_obj`` key is given, it is the caller’s responsibility
360
        to call ``file_obj.close()``.
361
    """
362
363
    logger.info("Fetching URL '{}' for WeasyPrint".format(url))
364
365
    # get the pyhsical path from the URL
366
    request = api.get_request()
367
    host = request.get_header("HOST")
368
    path = "/".join(request.physicalPathFromURL(url))
369
370
    # fetch the object by sub-request
371
    portal = api.get_portal()
372
    context = portal.restrictedTraverse(path, None)
373
374
    # We double check here to avoid an edge case, where we have the same path
375
    # as well in our local site, e.g. we have `/senaite/img/systems/senaite.png`,
376
    # but the user requested http://www.ridingbytes.com/img/systems/senaite.png:
377
    #
378
    # "/".join(request.physicalPathFromURL("http://www.ridingbytes.com/img/systems/senaite.png"))
379
    # '/senaite/img/systems/senaite.png'
380
    if context is None or host not in url:
381
        logger.info("URL is external, passing over to the default URL fetcher...")
382
        return default_url_fetcher(url)
383
384
    logger.info("URL is local, fetching data by path '{}' via subrequest".format(path))
385
386
    # get the data via an authenticated subrequest
387
    response = subrequest(path)
388
389
    # Prepare the return data as required by WeasyPrint
390
    string = response.getBody()
391
    filename = url.split("/")[-1]
392
    mime_type = mimetypes.guess_type(url)[0]
393
    redirected_url = url
394
395
    return {
396
        "string": string,
397
        "filename": filename,
398
        "mime_type": mime_type,
399
        "redirected_url": redirected_url,
400
    }
401
402
403
def createPdf(htmlreport, outfile=None, css=None, images={}):
404
    """create a PDF from some HTML.
405
    htmlreport: rendered html
406
    outfile: pdf filename; if supplied, caller is responsible for creating
407
             and removing it.
408
    css: remote URL of css file to download
409
    images: A dictionary containing possible URLs (keys) and local filenames
410
            (values) with which they may to be replaced during rendering.
411
    # WeasyPrint will attempt to retrieve images directly from the URL
412
    # referenced in the HTML report, which may refer back to a single-threaded
413
    # (and currently occupied) zeoclient, hanging it.  All image source
414
    # URL's referenced in htmlreport should be local files.
415
    """
416
    # A list of files that should be removed after PDF is written
417
    cleanup = []
418
    css_def = ''
419
    if css:
420
        if css.startswith("http://") or css.startswith("https://"):
421
            # Download css file in temp dir
422
            u = urlopen(css)
423
            _cssfile = tempfile.mktemp(suffix='.css')
424
            localFile = open(_cssfile, 'w')
425
            localFile.write(u.read())
426
            localFile.close()
427
            cleanup.append(_cssfile)
428
        else:
429
            _cssfile = css
430
        cssfile = open(_cssfile, 'r')
431
        css_def = cssfile.read()
432
433
    htmlreport = to_utf8(htmlreport)
434
435
    for (key, val) in images.items():
436
        htmlreport = htmlreport.replace(key, val)
437
438
    # render
439
    htmlreport = to_utf8(htmlreport)
440
    renderer = HTML(string=htmlreport, url_fetcher=senaite_url_fetcher, encoding='utf-8')
441
    pdf_fn = outfile if outfile else tempfile.mktemp(suffix=".pdf")
442
    if css:
443
        renderer.write_pdf(pdf_fn, stylesheets=[CSS(string=css_def)])
444
    else:
445
        renderer.write_pdf(pdf_fn)
446
    # return file data
447
    pdf_data = open(pdf_fn, "rb").read()
448
    if outfile is None:
449
        os.remove(pdf_fn)
450
    for fn in cleanup:
451
        os.remove(fn)
452
    return pdf_data
453
454
455
def attachPdf(mimemultipart, pdfreport, filename=None):
456
    part = MIMEBase('application', "pdf")
457
    part.add_header('Content-Disposition',
458
                    'attachment; filename="%s.pdf"' % (filename or tmpID()))
459
    part.set_payload(pdfreport)
460
    Encoders.encode_base64(part)
461
    mimemultipart.attach(part)
462
463
464
def currency_format(context, locale):
465
    locale = locales.getLocale(locale)
466
    currency = context.bika_setup.getCurrency()
467
    symbol = locale.numbers.currencies[currency].symbol
468
469
    def format(val):
470
        return '%s %0.2f' % (symbol, val)
471
    return format
472
473
474
def getHiddenAttributesForClass(classname):
475
    try:
476
        registry = queryUtility(IRegistry)
477
        hiddenattributes = registry.get('bika.lims.hiddenattributes', ())
478
        if hiddenattributes is not None:
479
            for alist in hiddenattributes:
480
                if alist[0] == classname:
481
                    return alist[1:]
482
    except Exception:
483
        logger.warning(
484
            'Probem accessing optionally hidden attributes in registry')
485
486
    return []
487
488
489
def dicts_to_dict(dictionaries, key_subfieldname):
490
    """Convert a list of dictionaries into a dictionary of dictionaries.
491
492
    key_subfieldname must exist in each Record's subfields and have a value,
493
    which will be used as the key for the new dictionary. If a key is duplicated,
494
    the earlier value will be overwritten.
495
    """
496
    result = {}
497
    for d in dictionaries:
498
        result[d[key_subfieldname]] = d
499
    return result
500
501
502
def format_supsub(text):
503
    """
504
    Mainly used for Analysis Service's unit. Transform the text adding
505
    sub and super html scripts:
506
    For super-scripts, use ^ char
507
    For sub-scripts, use _ char
508
    The expression "cm^2" will be translated to "cm²" and the
509
    expression "b_(n-1)" will be translated to "b n-1".
510
    The expression "n_(fibras)/cm^3" will be translated as
511
    "n fibras / cm³"
512
    :param text: text to be formatted
513
    """
514
    out = []
515
    subsup = []
516
    clauses = []
517
    insubsup = True
518
    for c in str(text).strip():
519
        if c == '(':
520
            if insubsup is False:
521
                out.append(c)
522
                clauses.append(')')
523
            else:
524
                clauses.append('')
525
526
        elif c == ')':
527
            if len(clauses) > 0:
528
                out.append(clauses.pop())
529
                if len(subsup) > 0:
530
                    out.append(subsup.pop())
531
532
        elif c == '^':
533
            subsup.append('</sup>')
534
            out.append('<sup>')
535
            insubsup = True
536
            continue
537
538
        elif c == '_':
539
            subsup.append('</sub>')
540
            out.append('<sub>')
541
            insubsup = True
542
            continue
543
544
        elif c == ' ':
545
            if insubsup is True:
546
                out.append(subsup.pop())
547
            else:
548
                out.append(c)
549
        elif c in ['+', '-']:
550
            if len(clauses) == 0 and len(subsup) > 0:
551
                out.append(subsup.pop())
552
            out.append(c)
553
        else:
554
            out.append(c)
555
556
        insubsup = False
557
558
    while True:
559
        if len(subsup) == 0:
560
            break
561
        out.append(subsup.pop())
562
563
    return ''.join(out)
564
565
566
def checkPermissions(permissions=[], obj=None):
567
    """
568
    Checks if a user has permissions for a given object.
569
570
    Args:
571
        permissions: The permissions the current user must be compliant with
572
        obj: The object for which the permissions apply
573
574
    Returns:
575
        1 if the user complies with all the permissions for the given object.
576
        Otherwise, it returns empty.
577
    """
578
    if not obj:
579
        return False
580
    sm = getSecurityManager()
581
    for perm in permissions:
582
        if not sm.checkPermission(perm, obj):
583
            return ''
584
    return True
585
586
587
def getFromString(obj, string, default=None):
588
    attr_obj = obj
589
    attrs = string.split('.')
590
    for attr in attrs:
591
        attr_obj = api.safe_getattr(attr_obj, attr, default=None)
592
        if not attr_obj:
593
            break
594
    return attr_obj or default
595
596
597
def user_fullname(obj, userid):
598
    """
599
    Returns the user full name as string.
600
    """
601
    member = obj.portal_membership.getMemberById(userid)
602
    if member is None:
603
        return userid
604
    member_fullname = member.getProperty('fullname')
605
    portal_catalog = getToolByName(obj, 'portal_catalog')
606
    c = portal_catalog(portal_type='Contact', getUsername=userid)
607
    contact_fullname = c[0].getObject().getFullname() if c else None
608
    return contact_fullname or member_fullname or userid
609
610
611
def user_email(obj, userid):
612
    """
613
    This function returns the user email as string.
614
    """
615
    member = obj.portal_membership.getMemberById(userid)
616
    if member is None:
617
        return userid
618
    member_email = member.getProperty('email')
619
    portal_catalog = getToolByName(obj, 'portal_catalog')
620
    c = portal_catalog(portal_type='Contact', getUsername=userid)
621
    contact_email = c[0].getObject().getEmailAddress() if c else None
622
    return contact_email or member_email or ''
623
624
625
def measure_time(func_to_measure):
626
    """
627
    This decorator allows to measure the execution time
628
    of a function and prints it to the console.
629
    :param func_to_measure: function to be decorated
630
    """
631
    def wrap(*args, **kwargs):
632
        start_time = time()
633
        return_value = func_to_measure(*args, **kwargs)
634
        finish_time = time()
635
        log = "%s took %0.4f seconds. start_time = %0.4f - finish_time = %0.4f\n" % (func_to_measure.func_name,
636
                                                                                     finish_time - start_time,
637
                                                                                     start_time,
638
                                                                                     finish_time)
639
        print(log)
640
        return return_value
641
    return wrap
642
643
644
def copy_field_values(src, dst, ignore_fieldnames=None, ignore_fieldtypes=None):
645
    ignore_fields = ignore_fieldnames if ignore_fieldnames else []
646
    ignore_types = ignore_fieldtypes if ignore_fieldtypes else []
647
    if 'id' not in ignore_fields:
648
        ignore_fields.append('id')
649
650
    src_schema = src.Schema()
651
    dst_schema = dst.Schema()
652
653
    for field in src_schema.fields():
654
        if IComputedField.providedBy(field):
655
            continue
656
        fieldname = field.getName()
657
        if fieldname in ignore_fields \
658
                or field.type in ignore_types \
659
                or fieldname not in dst_schema:
660
            continue
661
        value = field.get(src)
662
        if value:
663
            dst_schema[fieldname].set(dst, value)
664
665
666
def get_link(href, value=None, **kwargs):
667
    """
668
    Returns a well-formed link. If href is None/empty, returns an empty string
669
    :param href: value to be set for attribute href
670
    :param value: the text to be displayed. If None, the href itself is used
671
    :param kwargs: additional attributes and values
672
    :return: a well-formed html anchor
673
    """
674
    if not href:
675
        return ""
676
    anchor_value = value and value or href
677
    attr = render_html_attributes(**kwargs)
678
    # Add a CSRF token
679
    if href.startswith("http"):
680
        href = addTokenToUrl(href)
681
    return '<a href="{}" {}>{}</a>'.format(href, attr, anchor_value)
682
683
684
def get_link_for(obj, **kwargs):
685
    """Returns a well-formed html anchor to the object
686
    """
687
    if not obj:
688
        return ""
689
    href = api.get_url(obj)
690
    value = api.get_title(obj)
691
    return get_link(href=href, value=value, **kwargs)
692
693
694
def get_email_link(email, value=None):
695
    """
696
    Returns a well-formed link to an email address. If email is None/empty,
697
    returns an empty string
698
    :param email: email address
699
    :param link_text: text to be displayed. If None, the email itself is used
700
    :return: a well-formatted html anchor
701
    """
702
    if not email:
703
        return ""
704
    mailto = 'mailto:{}'.format(email)
705
    link_value = value and value or email
706
    return get_link(mailto, link_value)
707
708
709
def get_image(name, **kwargs):
710
    """Returns a well-formed image
711
    :param name: file name of the image
712
    :param kwargs: additional attributes and values
713
    :return: a well-formed html img
714
    """
715
    if not name:
716
        return ""
717
    portal = api.get_portal()
718
    theme = portal.restrictedTraverse("@@senaite_theme")
719
    basename, ext = os.path.splitext(name)
720
    if basename in theme.icons():
721
        if "width" not in kwargs:
722
            kwargs["width"] = "16"
723
        return theme.icon_tag(basename, **kwargs)
724
    portal_url = api.get_url(portal)
725
    attr = render_html_attributes(**kwargs)
726
    html = '<img src="{}/++resource++bika.lims.images/{}" {}/>'
727
    return html.format(portal_url, name, attr)
728
729
730
def get_progress_bar_html(percentage):
731
    """Returns an html that represents a progress bar
732
    """
733
    return '<div class="progress md-progress">' \
734
           '<div class="progress-bar" style="width: {0}%">{0}%</div>' \
735
           '</div>'.format(percentage or 0)
736
737
738
def render_html_attributes(**kwargs):
739
    """Returns a string representation of attributes for html entities
740
    :param kwargs: attributes and values
741
    :return: a well-formed string representation of attributes"""
742
    attr = list()
743
    if kwargs:
744
        attr = ['{}="{}"'.format(key, val) for key, val in kwargs.items()]
745
    return " ".join(attr).replace("css_class", "class")
746
747
748
def get_registry_value(key, default=None):
749
    """
750
    Gets the utility for IRegistry and returns the value for the key passed in.
751
    If there is no value for the key passed in, returns default value
752
    :param key: the key in the registry to look for
753
    :param default: default value if the key is not registered
754
    :return: value in the registry for the key passed in
755
    """
756
    registry = queryUtility(IRegistry)
757
    return registry.get(key, default)
758
759
760
def check_permission(permission, obj):
761
    """
762
    Returns if the current user has rights for the permission passed in against
763
    the obj passed in
764
    :param permission: name of the permission
765
    :param obj: the object to check the permission against for the current user
766
    :return: 1 if the user has rights for this permission for the passed in obj
767
    """
768
    mtool = api.get_tool('portal_membership')
769
    object = api.get_object(obj)
770
    return mtool.checkPermission(permission, object)
771
772
773
def to_int(value, default=0):
774
    """
775
    Tries to convert the value passed in as an int. If no success, returns the
776
    default value passed in
777
    :param value: the string to convert to integer
778
    :param default: the default fallback
779
    :return: int representation of the value passed in
780
    """
781
    try:
782
        return int(value)
783
    except (TypeError, ValueError):
784
        return to_int(default, default=0)
785
786
787
def get_strings(data):
788
    """
789
    Convert unicode values to strings even if they belong to lists or dicts.
790
    :param data: an object.
791
    :return: The object with all unicode values converted to string.
792
    """
793
    # if this is a unicode string, return its string representation
794
    if isinstance(data, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable unicode does not seem to be defined.
Loading history...
795
        return data.encode('utf-8')
796
797
    # if this is a list of values, return list of string values
798
    if isinstance(data, list):
799
        return map(get_strings, data)
800
801
    # if this is a dictionary, return dictionary of string keys and values
802
    if isinstance(data, dict):
803
804
        return {
805
            get_strings(key): get_strings(value)
806
            for key, value in six.iteritems(data)
807
        }
808
    # if it's anything else, return it in its original form
809
    return data
810
811
812
def get_unicode(data):
813
    """
814
    Convert string values to unicode even if they belong to lists or dicts.
815
    :param data: an object.
816
    :return: The object with all string values converted to unicode.
817
    """
818
    # if this is a common string, return its unicode representation
819
    if isinstance(data, str):
820
        return safe_unicode(data)
821
822
    # if this is a list of values, return list of unicode values
823
    if isinstance(data, list):
824
        return [get_unicode(item) for item in data]
825
826
    # if this is a dictionary, return dictionary of unicode keys and values
827
    if isinstance(data, dict):
828
        return {
829
            get_unicode(key): get_unicode(value)
830
            for key, value in six.iteritems(data)
831
        }
832
    # if it's anything else, return it in its original form
833
    return data
834
835
836
def is_bika_installed():
837
    """Check if Bika LIMS is installed in the Portal
838
    """
839
    qi = api.portal.get_tool("portal_quickinstaller")
840
    return qi.isProductInstalled("bika.lims")
841
842
843
def get_display_list(brains_or_objects=None, none_item=False):
844
    """
845
    Returns a DisplayList with the items sorted by Title
846
    :param brains_or_objects: list of brains or objects
847
    :param none_item: adds an item with empty uid and text "Select.." in pos 0
848
    :return: DisplayList (uid, title) sorted by title ascending
849
    :rtype: DisplayList
850
    """
851
    if brains_or_objects is None:
852
        return get_display_list(list(), none_item)
853
854
    items = list()
855
    for brain in brains_or_objects:
856
        uid = api.get_uid(brain)
857
        if not uid:
858
            continue
859
        title = api.get_title(brain)
860
        items.append((uid, title))
861
862
    # Sort items by title ascending
863
    items.sort(lambda x, y: cmp(x[1], y[1]))
864
865
    # Add the first item?
866
    if none_item:
867
        items.insert(0, ('', t('Select...')))
868
869
    return DisplayList(items)
870
871
872
def to_choices(display_list):
873
    """Converts a display list to a choices list
874
    """
875
    if not display_list:
876
        return []
877
878
    return map(
879
        lambda item: {
880
            "ResultValue": item[0],
881
            "ResultText": item[1]},
882
        display_list.items())
883
884
885
def chain(obj):
886
    """Generator to walk the acquistion chain of object, considering that it
887
    could be a function.
888
889
    If the thing we are accessing is actually a bound method on an instance,
890
    then after we've checked the method itself, get the instance it's bound to
891
    using im_self, so that we can continue to walk up the acquistion chain from
892
    it (incidentally, this is why we can't juse use aq_chain()).
893
    """
894
    context = aq_inner(obj)
895
896
    while context is not None:
897
        yield context
898
899
        func_object = getattr(context, "im_self", None)
900
        if func_object is not None:
901
            context = aq_inner(func_object)
902
        else:
903
            # Don't use aq_inner() since portal_factory (and probably other)
904
            # things, depends on being able to wrap itself in a fake context.
905
            context = aq_parent(context)
906
907
908
def get_client(obj):
909
    """Returns the client the object passed-in belongs to, if any
910
911
    This walks the acquisition chain up until we find something which provides
912
    either IClient or IClientAwareMixin
913
    """
914
    for obj in chain(obj):
915
        if IClient.providedBy(obj):
916
            return obj
917
        elif IClientAwareMixin.providedBy(obj):
918
            # ClientAwareMixin can return a Client, even if there is no client
919
            # in the acquisition chain
920
            return obj.getClient()
921
922
    return None
923