1
|
|
|
# Licensed to the StackStorm, Inc ('StackStorm') under one or more |
2
|
|
|
# contributor license agreements. See the NOTICE file distributed with |
3
|
|
|
# this work for additional information regarding copyright ownership. |
4
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
5
|
|
|
# (the "License"); you may not use this file except in compliance with |
6
|
|
|
# the License. You may obtain a copy of the License at |
7
|
|
|
# |
8
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9
|
|
|
# |
10
|
|
|
# Unless required by applicable law or agreed to in writing, software |
11
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
12
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13
|
|
|
# See the License for the specific language governing permissions and |
14
|
|
|
# limitations under the License. |
15
|
|
|
|
16
|
|
|
import eventlet |
17
|
|
|
import six |
18
|
|
|
|
19
|
|
|
from collections import defaultdict |
20
|
|
|
from kombu import Connection |
21
|
|
|
|
22
|
|
|
from st2common.query.base import QueryContext |
23
|
|
|
from st2common import log as logging |
24
|
|
|
from st2common.models.db.executionstate import ActionExecutionStateDB |
25
|
|
|
from st2common.persistence.executionstate import ActionExecutionState |
26
|
|
|
from st2common.transport import actionexecutionstate, consumers, publishers |
27
|
|
|
from st2common.transport import utils as transport_utils |
28
|
|
|
from st2common.util.loader import register_query_module |
29
|
|
|
|
30
|
|
|
|
31
|
|
|
LOG = logging.getLogger(__name__) |
32
|
|
|
|
33
|
|
|
ACTIONSTATE_WORK_Q = actionexecutionstate.get_queue('st2.resultstracker.work', |
34
|
|
|
routing_key=publishers.CREATE_RK) |
35
|
|
|
|
36
|
|
|
|
37
|
|
|
class ResultsTracker(consumers.MessageHandler): |
38
|
|
|
message_type = ActionExecutionStateDB |
39
|
|
|
|
40
|
|
|
def __init__(self, connection, queues): |
41
|
|
|
super(ResultsTracker, self).__init__(connection, queues) |
42
|
|
|
self._queriers = {} |
43
|
|
|
self._query_threads = [] |
44
|
|
|
self._failed_imports = set() |
45
|
|
|
|
46
|
|
|
def start(self, wait=False): |
47
|
|
|
self._bootstrap() |
48
|
|
|
super(ResultsTracker, self).start(wait=wait) |
49
|
|
|
|
50
|
|
|
def wait(self): |
51
|
|
|
super(ResultsTracker, self).wait() |
52
|
|
|
for thread in self._query_threads: |
53
|
|
|
thread.wait() |
54
|
|
|
|
55
|
|
|
def shutdown(self): |
56
|
|
|
super(ResultsTracker, self).shutdown() |
57
|
|
|
LOG.info('Stats from queriers:') |
58
|
|
|
self._print_stats() |
59
|
|
|
|
60
|
|
|
def _print_stats(self): |
61
|
|
|
for _, querier in six.iteritems(self._queriers): |
62
|
|
|
if querier: |
63
|
|
|
querier.print_stats() |
64
|
|
|
|
65
|
|
|
def _bootstrap(self): |
66
|
|
|
all_states = ActionExecutionState.get_all() |
67
|
|
|
LOG.info('Found %d pending states in db.' % len(all_states)) |
68
|
|
|
|
69
|
|
|
query_contexts_dict = defaultdict(list) |
70
|
|
|
for state_db in all_states: |
71
|
|
|
try: |
72
|
|
|
context = QueryContext.from_model(state_db) |
73
|
|
|
except: |
74
|
|
|
LOG.exception('Invalid state object: %s', state_db) |
75
|
|
|
continue |
76
|
|
|
query_module_name = state_db.query_module |
77
|
|
|
querier = self.get_querier(query_module_name) |
78
|
|
|
|
79
|
|
|
if querier is not None: |
80
|
|
|
query_contexts_dict[querier].append(context) |
81
|
|
|
|
82
|
|
|
for querier, contexts in six.iteritems(query_contexts_dict): |
83
|
|
|
LOG.info('Found %d pending actions for query module %s', len(contexts), querier) |
84
|
|
|
querier.add_queries(query_contexts=contexts) |
85
|
|
|
|
86
|
|
|
def process(self, query_context): |
87
|
|
|
querier = self.get_querier(query_context.query_module) |
88
|
|
|
context = QueryContext.from_model(query_context) |
89
|
|
|
querier.add_queries(query_contexts=[context]) |
90
|
|
|
return |
91
|
|
|
|
92
|
|
|
def get_querier(self, query_module_name): |
93
|
|
|
if (query_module_name not in self._queriers and |
94
|
|
|
query_module_name not in self._failed_imports): |
95
|
|
|
try: |
96
|
|
|
query_module = register_query_module(query_module_name) |
97
|
|
|
except: |
98
|
|
|
LOG.exception('Failed importing query module: %s', query_module_name) |
99
|
|
|
self._failed_imports.add(query_module_name) |
100
|
|
|
self._queriers[query_module_name] = None |
101
|
|
|
else: |
102
|
|
|
querier = query_module.get_instance() |
103
|
|
|
self._queriers[query_module_name] = querier |
104
|
|
|
self._query_threads.append(eventlet.spawn(querier.start)) |
105
|
|
|
|
106
|
|
|
return self._queriers[query_module_name] |
107
|
|
|
|
108
|
|
|
|
109
|
|
|
def get_tracker(): |
110
|
|
|
with Connection(transport_utils.get_messaging_urls()) as conn: |
111
|
|
|
return ResultsTracker(conn, [ACTIONSTATE_WORK_Q]) |
112
|
|
|
|