Passed
Push — develop ( b8d4ca...421369 )
by Plexxi
07:01 queued 03:57
created

ResultsTracker._import_query_module()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import eventlet
17
import six
18
19
from collections import defaultdict
20
from kombu import Connection
21
22
from st2common.query.base import QueryContext
23
from st2common import log as logging
24
from st2common.models.db.executionstate import ActionExecutionStateDB
25
from st2common.persistence.executionstate import ActionExecutionState
26
from st2common.transport import actionexecutionstate, consumers, publishers
27
from st2common.transport import utils as transport_utils
28
from st2common.util.loader import register_query_module
29
30
31
LOG = logging.getLogger(__name__)
32
33
ACTIONSTATE_WORK_Q = actionexecutionstate.get_queue('st2.resultstracker.work',
34
                                                    routing_key=publishers.CREATE_RK)
35
36
37
class ResultsTracker(consumers.MessageHandler):
38
    message_type = ActionExecutionStateDB
39
40
    def __init__(self, connection, queues):
41
        super(ResultsTracker, self).__init__(connection, queues)
42
        self._queriers = {}
43
        self._query_threads = []
44
        self._failed_imports = set()
45
46
    def start(self, wait=False):
47
        self._bootstrap()
48
        super(ResultsTracker, self).start(wait=wait)
49
50
    def wait(self):
51
        super(ResultsTracker, self).wait()
52
        for thread in self._query_threads:
53
            thread.wait()
54
55
    def shutdown(self):
56
        super(ResultsTracker, self).shutdown()
57
        LOG.info('Stats from queriers:')
58
        self._print_stats()
59
60
    def _print_stats(self):
61
        for _, querier in six.iteritems(self._queriers):
62
            if querier:
63
                querier.print_stats()
64
65
    def _bootstrap(self):
66
        all_states = ActionExecutionState.get_all()
67
        LOG.info('Found %d pending states in db.' % len(all_states))
68
69
        query_contexts_dict = defaultdict(list)
70
        for state_db in all_states:
71
            try:
72
                context = QueryContext.from_model(state_db)
73
            except:
74
                LOG.exception('Invalid state object: %s', state_db)
75
                continue
76
            query_module_name = state_db.query_module
77
            querier = self.get_querier(query_module_name)
78
79
            if querier is not None:
80
                query_contexts_dict[querier].append(context)
81
82
        for querier, contexts in six.iteritems(query_contexts_dict):
83
            LOG.info('Found %d pending actions for query module %s', len(contexts), querier)
84
            querier.add_queries(query_contexts=contexts)
85
86
    def process(self, query_context):
87
        querier = self.get_querier(query_context.query_module)
88
        context = QueryContext.from_model(query_context)
89
        querier.add_queries(query_contexts=[context])
90
        return
91
92
    def get_querier(self, query_module_name):
93
        if (query_module_name not in self._queriers and
94
                query_module_name not in self._failed_imports):
95
            try:
96
                query_module = register_query_module(query_module_name)
97
            except:
98
                LOG.exception('Failed importing query module: %s', query_module_name)
99
                self._failed_imports.add(query_module_name)
100
                self._queriers[query_module_name] = None
101
            else:
102
                querier = query_module.get_instance()
103
                self._queriers[query_module_name] = querier
104
                self._query_threads.append(eventlet.spawn(querier.start))
105
106
        return self._queriers[query_module_name]
107
108
109
def get_tracker():
110
    with Connection(transport_utils.get_messaging_urls()) as conn:
111
        return ResultsTracker(conn, [ACTIONSTATE_WORK_Q])
112