Passed
Push — master ( 99c26c...6c755b )
by
unknown
03:20
created

Querier._query_and_save_results()   C

Complexity

Conditions 7

Size

Total Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
c 0
b 0
f 0
dl 0
loc 33
rs 5.5
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import abc
17
import eventlet
18
import Queue
19
import six
20
import time
21
22
from st2actions.container.service import RunnerContainerService
23
from st2common import log as logging
24
from st2common.constants import action as action_constants
25
from st2common.persistence.executionstate import ActionExecutionState
26
from st2common.persistence.liveaction import LiveAction
27
from st2common.runners import utils as runners_utils
28
from st2common.services import executions
29
from st2common.util import date as date_utils
30
31
LOG = logging.getLogger(__name__)
32
33
__all__ = [
34
    'Querier',
35
    'QueryContext'
36
]
37
38
39
@six.add_metaclass(abc.ABCMeta)
40
class Querier(object):
41
    delete_state_object_on_error = True
42
43
    def __init__(self, threads_pool_size=10, query_interval=1, empty_q_sleep_time=5,
44
                 no_workers_sleep_time=1, container_service=None):
45
        self._query_threads_pool_size = threads_pool_size
46
        self._query_contexts = Queue.Queue()
47
        self._thread_pool = eventlet.GreenPool(self._query_threads_pool_size)
48
        self._empty_q_sleep_time = empty_q_sleep_time
49
        self._no_workers_sleep_time = no_workers_sleep_time
50
        self._query_interval = query_interval
51
        if not container_service:
52
            container_service = RunnerContainerService()
53
        self.container_service = container_service
54
        self._started = False
55
56
    def start(self):
57
        self._started = True
58
        while True:
59
            while self._query_contexts.empty():
60
                eventlet.greenthread.sleep(self._empty_q_sleep_time)
61
            while self._thread_pool.free() <= 0:
62
                eventlet.greenthread.sleep(self._no_workers_sleep_time)
63
            self._fire_queries()
64
65
    def add_queries(self, query_contexts=None):
66
        if query_contexts is None:
67
            query_contexts = []
68
        LOG.debug('Adding queries to querier: %s' % query_contexts)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
69
        for query_context in query_contexts:
70
            self._query_contexts.put((time.time(), query_context))
71
72
    def is_started(self):
73
        return self._started
74
75
    def _fire_queries(self):
76
        if self._thread_pool.free() <= 0:
77
            return
78
        while not self._query_contexts.empty() and self._thread_pool.free() > 0:
79
            (last_query_time, query_context) = self._query_contexts.get_nowait()
80
            if time.time() - last_query_time < self._query_interval:
81
                self._query_contexts.put((last_query_time, query_context))
82
                continue
83
            else:
84
                self._thread_pool.spawn(self._query_and_save_results, query_context)
85
86
    def _query_and_save_results(self, query_context):
87
        execution_id = query_context.execution_id
88
        actual_query_context = query_context.query_context
89
90
        LOG.debug('Querying external service for results. Context: %s' % actual_query_context)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
91
        try:
92
            (status, results) = self.query(execution_id, actual_query_context)
93
        except:
94
            LOG.exception('Failed querying results for liveaction_id %s.', execution_id)
95
            if self.delete_state_object_on_error:
96
                self._delete_state_object(query_context)
97
                LOG.debug('Removed state object %s.', query_context)
98
            return
99
100
        liveaction_db = None
101
        try:
102
            liveaction_db = self._update_action_results(execution_id, status, results)
103
        except Exception:
104
            LOG.exception('Failed updating action results for liveaction_id %s', execution_id)
105
            if self.delete_state_object_on_error:
106
                self._delete_state_object(query_context)
107
                LOG.debug('Removed state object %s.', query_context)
108
            return
109
110
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
111
            if status != action_constants.LIVEACTION_STATUS_CANCELED:
112
                runners_utils.invoke_post_run(liveaction_db)
113
114
            self._delete_state_object(query_context)
115
116
            return
117
118
        self._query_contexts.put((time.time(), query_context))
119
120
    def _update_action_results(self, execution_id, status, results):
121
        liveaction_db = LiveAction.get_by_id(execution_id)
122
        if not liveaction_db:
123
            raise Exception('No DB model for liveaction_id: %s' % execution_id)
124
125
        if liveaction_db.status != action_constants.LIVEACTION_STATUS_CANCELED:
126
            liveaction_db.status = status
127
128
        liveaction_db.result = results
129
130
        # Action has completed, record end_timestamp
131
        if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES and
132
                not liveaction_db.end_timestamp):
133
            liveaction_db.end_timestamp = date_utils.get_datetime_utc_now()
134
135
        # update liveaction, update actionexecution and then publish update.
136
        updated_liveaction = LiveAction.add_or_update(liveaction_db, publish=False)
137
        executions.update_execution(updated_liveaction)
138
        LiveAction.publish_update(updated_liveaction)
139
140
        return updated_liveaction
141
142
    def _delete_state_object(self, query_context):
143
        state_db = ActionExecutionState.get_by_id(query_context.id)
144
        if state_db is not None:
145
            try:
146
                LOG.info('Clearing state object: %s', state_db)
147
                ActionExecutionState.delete(state_db)
148
            except:
149
                LOG.exception('Failed clearing state object: %s', state_db)
150
151
    def query(self, execution_id, query_context):
152
        """
153
        This is the method individual queriers must implement.
154
        This method should return a tuple of (status, results).
155
156
        status should be one of LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_RUNNING,
157
        LIVEACTION_STATUS_FAILED defined in st2common.constants.action.
158
        """
159
        pass
160
161
    def print_stats(self):
162
        LOG.info('\t --- Name: %s, pending queuries: %d', self.__class__.__name__,
163
                 self._query_contexts.qsize())
164
165
166
class QueryContext(object):
167
    def __init__(self, obj_id, execution_id, query_context, query_module):
168
        self.id = obj_id
169
        self.execution_id = execution_id
170
        self.query_context = query_context
171
        self.query_module = query_module
172
173
    @classmethod
174
    def from_model(cls, model):
175
        return QueryContext(str(model.id), str(model.execution_id), model.query_context,
176
                            model.query_module)
177
178
    def __repr__(self):
179
        return ('<QueryContext id=%s,execution_id=%s,query_context=%s>' %
180
                (self.id, self.execution_id, self.query_context))
181