Completed
Push — master ( fe66bc...2a24eb )
by Manas
18:11
created

st2actions.query.MistralCallbackHandler   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 40
Duplicated Lines 0 %
Metric Value
wmc 9
dl 0
loc 40
rs 10

2 Methods

Rating   Name   Duplication   Size   Complexity  
A _update_action_execution() 0 20 1
C callback() 0 17 8
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import abc
17
import eventlet
18
import Queue
19
import six
20
import time
21
22
from st2actions.container.service import RunnerContainerService
23
from st2actions.runners import get_runner
24
from st2common import log as logging
25
from st2common.constants import action as action_constants
26
from st2common.persistence.executionstate import ActionExecutionState
27
from st2common.persistence.liveaction import LiveAction
28
from st2common.services import executions
29
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
30
from st2common.util import date as date_utils
31
32
LOG = logging.getLogger(__name__)
33
34
__all__ = [
35
    'Querier',
36
    'QueryContext'
37
]
38
39
40
@six.add_metaclass(abc.ABCMeta)
41
class Querier(object):
42
    def __init__(self, threads_pool_size=10, query_interval=1, empty_q_sleep_time=5,
43
                 no_workers_sleep_time=1, container_service=None):
44
        self._query_threads_pool_size = threads_pool_size
45
        self._query_contexts = Queue.Queue()
46
        self._thread_pool = eventlet.GreenPool(self._query_threads_pool_size)
47
        self._empty_q_sleep_time = empty_q_sleep_time
48
        self._no_workers_sleep_time = no_workers_sleep_time
49
        self._query_interval = query_interval
50
        if not container_service:
51
            container_service = RunnerContainerService()
52
        self.container_service = container_service
53
        self._started = False
54
55
    def start(self):
56
        self._started = True
57
        while True:
58
            while self._query_contexts.empty():
59
                eventlet.greenthread.sleep(self._empty_q_sleep_time)
60
            while self._thread_pool.free() <= 0:
61
                eventlet.greenthread.sleep(self._no_workers_sleep_time)
62
            self._fire_queries()
63
64
    def add_queries(self, query_contexts=None):
65
        if query_contexts is None:
66
            query_contexts = []
67
        LOG.debug('Adding queries to querier: %s' % query_contexts)
68
        for query_context in query_contexts:
69
            self._query_contexts.put((time.time(), query_context))
70
71
    def is_started(self):
72
        return self._started
73
74
    def _fire_queries(self):
75
        if self._thread_pool.free() <= 0:
76
            return
77
        while not self._query_contexts.empty() and self._thread_pool.free() > 0:
78
            (last_query_time, query_context) = self._query_contexts.get_nowait()
79
            if time.time() - last_query_time < self._query_interval:
80
                self._query_contexts.put((last_query_time, query_context))
81
                continue
82
            else:
83
                self._thread_pool.spawn(self._query_and_save_results, query_context)
84
85
    def _query_and_save_results(self, query_context):
86
        execution_id = query_context.execution_id
87
        actual_query_context = query_context.query_context
88
89
        LOG.debug('Querying external service for results. Context: %s' % actual_query_context)
90
        try:
91
            (status, results) = self.query(execution_id, actual_query_context)
92
        except:
93
            LOG.exception('Failed querying results for liveaction_id %s.', execution_id)
94
            self._delete_state_object(query_context)
95
            LOG.debug('Remove state object %s.', query_context)
96
            return
97
98
        liveaction_db = None
99
        try:
100
            liveaction_db = self._update_action_results(execution_id, status, results)
101
        except Exception:
102
            LOG.exception('Failed updating action results for liveaction_id %s', execution_id)
103
            self._delete_state_object(query_context)
104
            return
105
106
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
107
            action_db = get_action_by_ref(liveaction_db.action)
108
            if not action_db:
109
                LOG.exception('Unable to invoke post run. Action %s '
110
                              'no longer exists.' % liveaction_db.action)
111
                self._delete_state_object(query_context)
112
                return
113
            if status != action_constants.LIVEACTION_STATUS_CANCELED:
114
                self._invoke_post_run(liveaction_db, action_db)
115
            self._delete_state_object(query_context)
116
            return
117
118
        self._query_contexts.put((time.time(), query_context))
119
120
    def _update_action_results(self, execution_id, status, results):
121
        liveaction_db = LiveAction.get_by_id(execution_id)
122
        if not liveaction_db:
123
            raise Exception('No DB model for liveaction_id: %s' % execution_id)
124
125
        if liveaction_db.status != action_constants.LIVEACTION_STATUS_CANCELED:
126
            liveaction_db.status = status
127
128
        liveaction_db.result = results
129
130
        # Action has completed, record end_timestamp
131
        if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES and
132
                not liveaction_db.end_timestamp):
133
            liveaction_db.end_timestamp = date_utils.get_datetime_utc_now()
134
135
        # update liveaction, update actionexecution and then publish update.
136
        updated_liveaction = LiveAction.add_or_update(liveaction_db, publish=False)
137
        executions.update_execution(updated_liveaction)
138
        LiveAction.publish_update(updated_liveaction)
139
140
        return updated_liveaction
141
142
    def _invoke_post_run(self, actionexec_db, action_db):
143
        LOG.info('Invoking post run for action execution %s. Action=%s; Runner=%s',
144
                 actionexec_db.id, action_db.name, action_db.runner_type['name'])
145
146
        # Get an instance of the action runner.
147
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
148
        runner = get_runner(runnertype_db.runner_module)
149
150
        # Configure the action runner.
151
        runner.container_service = RunnerContainerService()
152
        runner.action = action_db
153
        runner.action_name = action_db.name
154
        runner.action_execution_id = str(actionexec_db.id)
155
        runner.entry_point = RunnerContainerService.get_entry_point_abs_path(
156
            pack=action_db.pack, entry_point=action_db.entry_point)
157
        runner.context = getattr(actionexec_db, 'context', dict())
158
        runner.callback = getattr(actionexec_db, 'callback', dict())
159
        runner.libs_dir_path = RunnerContainerService.get_action_libs_abs_path(
160
            pack=action_db.pack, entry_point=action_db.entry_point)
161
162
        # Invoke the post_run method.
163
        runner.post_run(actionexec_db.status, actionexec_db.result)
164
165
    def _delete_state_object(self, query_context):
166
        state_db = ActionExecutionState.get_by_id(query_context.id)
167
        if state_db is not None:
168
            try:
169
                LOG.info('Clearing state object: %s', state_db)
170
                ActionExecutionState.delete(state_db)
171
            except:
172
                LOG.exception('Failed clearing state object: %s', state_db)
173
174
    def query(self, execution_id, query_context):
175
        """
176
        This is the method individual queriers must implement.
177
        This method should return a tuple of (status, results).
178
179
        status should be one of LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_RUNNING,
180
        LIVEACTION_STATUS_FAILED defined in st2common.constants.action.
181
        """
182
        pass
183
184
    def print_stats(self):
185
        LOG.info('\t --- Name: %s, pending queuries: %d', self.__class__.__name__,
186
                 self._query_contexts.qsize())
187
188
189
class QueryContext(object):
190
    def __init__(self, obj_id, execution_id, query_context, query_module):
191
        self.id = obj_id
192
        self.execution_id = execution_id
193
        self.query_context = query_context
194
        self.query_module = query_module
195
196
    @classmethod
197
    def from_model(cls, model):
198
        return QueryContext(str(model.id), str(model.execution_id), model.query_context,
199
                            model.query_module)
200
201
    def __repr__(self):
202
        return ('<QueryContext id=%s,execution_id=%s,query_context=%s>' %
203
                (self.id, self.execution_id, self.query_context))
204