Passed
Push — master ( 2035c6...7cdfd5 )
by Jordi
05:03
created

bika.lims.decorators.synchronized()   A

Complexity

Conditions 1

Size

Total Lines 21
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 21
rs 9.6
c 0
b 0
f 0
cc 1
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import cProfile
22
import json
23
import os
24
import threading
25
import time
26
from functools import wraps
27
28
from bika.lims import api
29
from bika.lims import logger
30
from senaite.core.supermodel.interfaces import ISuperModel
31
from zope.component import queryAdapter
32
33
34
def XXX_REMOVEME(func):
35
    """Decorator for dead code removal
36
    """
37
    @wraps(func)
38
    def decorator(self, *args, **kwargs):
39
        msg = "~~~~~~~ XXX REMOVEME marked method called: {}.{}".format(
40
            self.__class__.__name__, func.func_name)
41
        raise RuntimeError(msg)
42
        return func(self, *args, **kwargs)
43
    return decorator
44
45
46 View Code Duplication
def returns_json(func):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
47
    """Decorator for functions which return JSON
48
    """
49
    def decorator(*args, **kwargs):
50
        instance = args[0]
51
        request = getattr(instance, 'request', None)
52
        request.response.setHeader("Content-Type", "application/json")
53
        result = func(*args, **kwargs)
54
        return json.dumps(result)
55
    return decorator
56
57
58
def returns_super_model(func):
59
    """Decorator to return standard content objects as SuperModels
60
    """
61
    def to_super_model(obj):
62
        # avoid circular imports
63
        from senaite.core.supermodel import SuperModel
64
65
        # Object is already a SuperModel
66
        if isinstance(obj, SuperModel):
67
            return obj
68
69
        # Only portal objects are supported
70
        if not api.is_object(obj):
71
            raise TypeError("Expected a portal object, got '{}'"
72
                            .format(type(obj)))
73
74
        # Wrap the object into a specific Publication Object Adapter
75
        uid = api.get_uid(obj)
76
        portal_type = api.get_portal_type(obj)
77
78
        adapter = queryAdapter(uid, ISuperModel, name=portal_type)
79
        if adapter is None:
80
            return SuperModel(uid)
81
        return adapter
82
83
    @wraps(func)
84
    def wrapper(*args, **kwargs):
85
        obj = func(*args, **kwargs)
86
        if isinstance(obj, (list, tuple)):
87
            return map(to_super_model, obj)
88
        return to_super_model(obj)
89
90
    return wrapper
91
92
93
def profileit(path=None):
94
    """cProfile decorator to profile a function
95
96
    :param path: output file path
97
    :type path: str
98
    :return: Function
99
    """
100
101
    def inner(func):
102
        @wraps(func)
103
        def wrapper(*args, **kwargs):
104
            prof = cProfile.Profile()
105
            retval = prof.runcall(func, *args, **kwargs)
106
            if path is not None:
107
                print prof.print_stats()
108
                prof.dump_stats(os.path.expanduser(path))
109
            else:
110
                print prof.print_stats()
111
            return retval
112
        return wrapper
113
    return inner
114
115
116
def timeit(threshold=0, show_args=False):
117
    """Decorator to log the execution time of a function
118
    """
119
120
    def inner(func):
121
        @wraps(func)
122
        def wrapper(*args, **kwargs):
123
            start = time.time()
124
            return_value = func(*args, **kwargs)
125
            end = time.time()
126
            duration = float(end-start)
127
            if duration > threshold:
128
                if show_args:
129
                    logger.info("Execution of '{}{}' took {:2f}s".format(
130
                        func.__name__, args, duration))
131
                else:
132
                    logger.info("Execution of '{}' took {:2f}s".format(
133
                        func.__name__, duration))
134
            return return_value
135
        return wrapper
136
    return inner
137
138
139
def synchronized(max_connections=2, verbose=0):
140
    """Synchronize function call via semaphore
141
    """
142
    semaphore = threading.BoundedSemaphore(max_connections, verbose=verbose)
143
144
    def inner(func):
145
        logger.debug("Semaphore for {} -> {}".format(func, semaphore))
146
        @wraps(func)
147
        def wrapper(*args, **kwargs):
148
            try:
149
                logger.info("==> {}::Acquire Semaphore ...".format(
150
                    func.__name__))
151
                semaphore.acquire()
152
                return func(*args, **kwargs)
153
            finally:
154
                logger.info("<== {}::Release Semaphore ...".format(
155
                    func.__name__))
156
                semaphore.release()
157
158
        return wrapper
159
    return inner
160