Passed
Push — master ( 4749ba...200efe )
by W
07:55
created

Querier._fire_queries()   B

Complexity

Conditions 5

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
dl 0
loc 10
rs 8.5454
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import abc
17
import eventlet
18
import Queue
19
import six
20
import time
21
22
from st2actions.container.service import RunnerContainerService
23
from st2common.runners.base import get_runner
24
from st2common import log as logging
25
from st2common.constants import action as action_constants
26
from st2common.persistence.executionstate import ActionExecutionState
27
from st2common.persistence.liveaction import LiveAction
28
from st2common.services import executions
29
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
30
from st2common.util import date as date_utils
31
32
LOG = logging.getLogger(__name__)
33
34
__all__ = [
35
    'Querier',
36
    'QueryContext'
37
]
38
39
40
@six.add_metaclass(abc.ABCMeta)
41
class Querier(object):
42
    delete_state_object_on_error = True
43
44
    def __init__(self, threads_pool_size=10, query_interval=1, empty_q_sleep_time=5,
45
                 no_workers_sleep_time=1, container_service=None):
46
        self._query_threads_pool_size = threads_pool_size
47
        self._query_contexts = Queue.Queue()
48
        self._thread_pool = eventlet.GreenPool(self._query_threads_pool_size)
49
        self._empty_q_sleep_time = empty_q_sleep_time
50
        self._no_workers_sleep_time = no_workers_sleep_time
51
        self._query_interval = query_interval
52
        if not container_service:
53
            container_service = RunnerContainerService()
54
        self.container_service = container_service
55
        self._started = False
56
57
    def start(self):
58
        self._started = True
59
        while True:
60
            while self._query_contexts.empty():
61
                eventlet.greenthread.sleep(self._empty_q_sleep_time)
62
            while self._thread_pool.free() <= 0:
63
                eventlet.greenthread.sleep(self._no_workers_sleep_time)
64
            self._fire_queries()
65
66
    def add_queries(self, query_contexts=None):
67
        if query_contexts is None:
68
            query_contexts = []
69
        LOG.debug('Adding queries to querier: %s' % query_contexts)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
70
        for query_context in query_contexts:
71
            self._query_contexts.put((time.time(), query_context))
72
73
    def is_started(self):
74
        return self._started
75
76
    def _fire_queries(self):
77
        if self._thread_pool.free() <= 0:
78
            return
79
        while not self._query_contexts.empty() and self._thread_pool.free() > 0:
80
            (last_query_time, query_context) = self._query_contexts.get_nowait()
81
            if time.time() - last_query_time < self._query_interval:
82
                self._query_contexts.put((last_query_time, query_context))
83
                continue
84
            else:
85
                self._thread_pool.spawn(self._query_and_save_results, query_context)
86
87
    def _query_and_save_results(self, query_context):
88
        execution_id = query_context.execution_id
89
        actual_query_context = query_context.query_context
90
91
        LOG.debug('Querying external service for results. Context: %s' % actual_query_context)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
92
        try:
93
            (status, results) = self.query(execution_id, actual_query_context)
94
        except:
95
            LOG.exception('Failed querying results for liveaction_id %s.', execution_id)
96
            if self.delete_state_object_on_error:
97
                self._delete_state_object(query_context)
98
                LOG.debug('Removed state object %s.', query_context)
99
            return
100
101
        liveaction_db = None
102
        try:
103
            liveaction_db = self._update_action_results(execution_id, status, results)
104
        except Exception:
105
            LOG.exception('Failed updating action results for liveaction_id %s', execution_id)
106
            if self.delete_state_object_on_error:
107
                self._delete_state_object(query_context)
108
                LOG.debug('Removed state object %s.', query_context)
109
            return
110
111
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
112
            action_db = get_action_by_ref(liveaction_db.action)
113
114
            if action_db:
115
                if status != action_constants.LIVEACTION_STATUS_CANCELED:
116
                    self._invoke_post_run(liveaction_db, action_db)
117
            else:
118
                LOG.exception('Unable to invoke post run. Action %s '
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
119
                              'no longer exists.' % liveaction_db.action)
120
121
            self._delete_state_object(query_context)
122
123
            return
124
125
        self._query_contexts.put((time.time(), query_context))
126
127
    def _update_action_results(self, execution_id, status, results):
128
        liveaction_db = LiveAction.get_by_id(execution_id)
129
        if not liveaction_db:
130
            raise Exception('No DB model for liveaction_id: %s' % execution_id)
131
132
        if liveaction_db.status != action_constants.LIVEACTION_STATUS_CANCELED:
133
            liveaction_db.status = status
134
135
        liveaction_db.result = results
136
137
        # Action has completed, record end_timestamp
138
        if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES and
139
                not liveaction_db.end_timestamp):
140
            liveaction_db.end_timestamp = date_utils.get_datetime_utc_now()
141
142
        # update liveaction, update actionexecution and then publish update.
143
        updated_liveaction = LiveAction.add_or_update(liveaction_db, publish=False)
144
        executions.update_execution(updated_liveaction)
145
        LiveAction.publish_update(updated_liveaction)
146
147
        return updated_liveaction
148
149
    def _invoke_post_run(self, actionexec_db, action_db):
150
        LOG.info('Invoking post run for action execution %s. Action=%s; Runner=%s',
151
                 actionexec_db.id, action_db.name, action_db.runner_type['name'])
152
153
        # Get an instance of the action runner.
154
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
155
        runner = get_runner(runnertype_db.runner_module)
156
157
        # Configure the action runner.
158
        runner.container_service = RunnerContainerService()
159
        runner.action = action_db
160
        runner.action_name = action_db.name
161
        runner.action_execution_id = str(actionexec_db.id)
162
        runner.entry_point = RunnerContainerService.get_entry_point_abs_path(
163
            pack=action_db.pack, entry_point=action_db.entry_point)
164
        runner.context = getattr(actionexec_db, 'context', dict())
165
        runner.callback = getattr(actionexec_db, 'callback', dict())
166
        runner.libs_dir_path = RunnerContainerService.get_action_libs_abs_path(
167
            pack=action_db.pack, entry_point=action_db.entry_point)
168
169
        # Invoke the post_run method.
170
        runner.post_run(actionexec_db.status, actionexec_db.result)
171
172
    def _delete_state_object(self, query_context):
173
        state_db = ActionExecutionState.get_by_id(query_context.id)
174
        if state_db is not None:
175
            try:
176
                LOG.info('Clearing state object: %s', state_db)
177
                ActionExecutionState.delete(state_db)
178
            except:
179
                LOG.exception('Failed clearing state object: %s', state_db)
180
181
    def query(self, execution_id, query_context):
182
        """
183
        This is the method individual queriers must implement.
184
        This method should return a tuple of (status, results).
185
186
        status should be one of LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_RUNNING,
187
        LIVEACTION_STATUS_FAILED defined in st2common.constants.action.
188
        """
189
        pass
190
191
    def print_stats(self):
192
        LOG.info('\t --- Name: %s, pending queuries: %d', self.__class__.__name__,
193
                 self._query_contexts.qsize())
194
195
196
class QueryContext(object):
197
    def __init__(self, obj_id, execution_id, query_context, query_module):
198
        self.id = obj_id
199
        self.execution_id = execution_id
200
        self.query_context = query_context
201
        self.query_module = query_module
202
203
    @classmethod
204
    def from_model(cls, model):
205
        return QueryContext(str(model.id), str(model.execution_id), model.query_context,
206
                            model.query_module)
207
208
    def __repr__(self):
209
        return ('<QueryContext id=%s,execution_id=%s,query_context=%s>' %
210
                (self.id, self.execution_id, self.query_context))
211