Passed
Push — develop ( 82f613...a5d438 )
by Plexxi
06:54 queued 03:28
created

BufferedDispatcher.name()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 3
rs 10
c 1
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import time
17
18
import eventlet
19
import Queue
20
21
from st2common import log as logging
22
23
__all__ = [
24
    'BufferedDispatcher'
25
]
26
27
# If the thread pool has been occupied with no empty threads for more than this number of seconds
28
# a message will be logged
29
POOL_BUSY_THRESHOLD_SECONDS = 60
30
31
POOL_BUSY_LOG_MESSAGE = """
32
BufferedDispatcher pool "%s" has been busy with no free threads for more than %s seconds. If there \
33
are server resources available, consider increasing the dispatcher pool size in the config.
34
""".strip()
35
36
LOG = logging.getLogger(__name__)
37
38
39
class BufferedDispatcher(object):
40
41
    def __init__(self, dispatch_pool_size=50, monitor_thread_empty_q_sleep_time=5,
42
                 monitor_thread_no_workers_sleep_time=1, name=None):
43
        self._pool_limit = dispatch_pool_size
44
        self._dispatcher_pool = eventlet.GreenPool(dispatch_pool_size)
45
        self._dispatch_monitor_thread = eventlet.greenthread.spawn(self._flush)
46
        self._monitor_thread_empty_q_sleep_time = monitor_thread_empty_q_sleep_time
47
        self._monitor_thread_no_workers_sleep_time = monitor_thread_no_workers_sleep_time
48
        self._name = name
49
50
        self._work_buffer = Queue.Queue()
51
52
        # Internal attributes we use to track how long the pool is busy without any free workers
53
        self._pool_last_free_ts = time.time()
54
55
    @property
56
    def name(self):
57
        return self._name or id(self)
58
59
    def dispatch(self, handler, *args):
60
        self._work_buffer.put((handler, args), block=True, timeout=1)
61
        self._flush_now()
62
63
    def shutdown(self):
64
        self._dispatch_monitor_thread.kill()
65
66
    def _flush(self):
67
        while True:
68
            while self._work_buffer.empty():
69
                eventlet.greenthread.sleep(self._monitor_thread_empty_q_sleep_time)
70
            while self._dispatcher_pool.free() <= 0:
71
                eventlet.greenthread.sleep(self._monitor_thread_no_workers_sleep_time)
72
            self._flush_now()
73
74
    def _flush_now(self):
75
        if self._dispatcher_pool.free() <= 0:
76
            now = time.time()
77
78
            if (now - self._pool_last_free_ts) >= POOL_BUSY_THRESHOLD_SECONDS:
79
                LOG.info(POOL_BUSY_LOG_MESSAGE % (self.name, POOL_BUSY_THRESHOLD_SECONDS))
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
80
81
            return
82
83
        # Update the time of when there were free threads available
84
        self._pool_last_free_ts = time.time()
85
86
        while not self._work_buffer.empty() and self._dispatcher_pool.free() > 0:
87
            (handler, args) = self._work_buffer.get_nowait()
88
            self._dispatcher_pool.spawn(handler, *args)
89
90
    def __repr__(self):
91
        free_count = self._dispatcher_pool.free()
92
        values = (self.name, self._pool_limit, free_count, self._monitor_thread_empty_q_sleep_time,
93
                  self._monitor_thread_no_workers_sleep_time)
94
        return ('<BufferedDispatcher name=%s,dispatch_pool_size=%s,free_threads=%s,'
95
                'monitor_thread_empty_q_sleep_time=%s,monitor_thread_no_workers_sleep_time=%s>' %
96
                values)
97