Completed
Push — master ( d1f0a7...3b2ece )
by Edward
21:04 queued 05:38
created

st2exporter.ExecutionsExporter.__init__()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 6
rs 9.4286
cc 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import Queue
17
18
import eventlet
19
from kombu import Connection
20
from oslo_config import cfg
21
22
from st2common import log as logging
23
from st2common.constants.action import (LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_FAILED,
24
                                        LIVEACTION_STATUS_CANCELED)
25
from st2common.models.api.execution import ActionExecutionAPI
26
from st2common.models.db.execution import ActionExecutionDB
27
from st2common.persistence.execution import ActionExecution
28
from st2common.persistence.marker import DumperMarker
29
from st2common.transport import consumers, execution, publishers
30
from st2common.transport import utils as transport_utils
31
from st2common.util import isotime
32
from st2exporter.exporter.dumper import Dumper
33
34
__all__ = [
35
    'ExecutionsExporter'
36
]
37
38
COMPLETION_STATUSES = [LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_FAILED,
39
                       LIVEACTION_STATUS_CANCELED]
40
LOG = logging.getLogger(__name__)
41
42
EXPORTER_WORK_Q = execution.get_queue(
43
    'st2.exporter.work', routing_key=publishers.UPDATE_RK)
44
45
46
class ExecutionsExporter(consumers.MessageHandler):
47
    message_type = ActionExecutionDB
48
49
    def __init__(self, connection, queues):
50
        super(ExecutionsExporter, self).__init__(connection, queues)
51
        self.pending_executions = Queue.Queue()
52
        self._dumper = Dumper(queue=self.pending_executions,
53
                              export_dir=cfg.CONF.exporter.dump_dir)
54
        self._consumer_thread = None
55
56
    def start(self, wait=False):
57
        LOG.info('Bootstrapping executions from db...')
58
        try:
59
            self._bootstrap()
60
        except:
61
            LOG.exception('Unable to bootstrap executions from db. Aborting.')
62
            raise
63
        self._consumer_thread = eventlet.spawn(super(ExecutionsExporter, self).start, wait=True)
64
        self._dumper.start()
65
        if wait:
66
            self.wait()
67
68
    def wait(self):
69
        self._consumer_thread.wait()
70
        self._dumper.wait()
71
72
    def shutdown(self):
73
        self._dumper.stop()
74
        super(ExecutionsExporter, self).shutdown()
75
76
    def process(self, execution):
0 ignored issues
show
Comprehensibility Bug introduced by
execution is re-defining a name which is already available in the outer-scope (previously defined on line 29).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
77
        LOG.debug('Got execution from queue: %s', execution)
78
        if execution.status not in COMPLETION_STATUSES:
79
            return
80
        execution_api = ActionExecutionAPI.from_model(execution, mask_secrets=True)
81
        self.pending_executions.put_nowait(execution_api)
82
        LOG.debug("Added execution to queue.")
83
84
    def _bootstrap(self):
85
        marker = self._get_export_marker_from_db()
86
        LOG.info('Using marker %s...' % marker)
87
        missed_executions = self._get_missed_executions_from_db(export_marker=marker)
88
        LOG.info('Found %d executions not exported yet...', len(missed_executions))
89
90
        for missed_execution in missed_executions:
91
            if missed_execution.status not in COMPLETION_STATUSES:
92
                continue
93
            execution_api = ActionExecutionAPI.from_model(missed_execution, mask_secrets=True)
94
            try:
95
                LOG.debug('Missed execution %s', execution_api)
96
                self.pending_executions.put_nowait(execution_api)
97
            except:
98
                LOG.exception('Failed adding execution to in-memory queue.')
99
                continue
100
        LOG.info('Bootstrapped executions...')
101
102
    def _get_export_marker_from_db(self):
103
        try:
104
            markers = DumperMarker.get_all()
105
        except:
106
            return None
107
        else:
108
            if len(markers) >= 1:
109
                marker = markers[0]
110
                return isotime.parse(marker.marker)
111
            else:
112
                return None
113
114
    def _get_missed_executions_from_db(self, export_marker=None):
115
        if not export_marker:
116
            return self._get_all_executions_from_db()
117
118
        # XXX: Should adapt this query to get only executions with status
119
        # in COMPLETION_STATUSES.
120
        filters = {'end_timestamp__gt': export_marker}
121
        LOG.info('Querying for executions with filters: %s', filters)
122
        return ActionExecution.query(**filters)
123
124
    def _get_all_executions_from_db(self):
125
        return ActionExecution.get_all()  # XXX: Paginated call.
126
127
128
def get_worker():
129
    with Connection(transport_utils.get_messaging_urls()) as conn:
130
        return ExecutionsExporter(conn, [EXPORTER_WORK_Q])
131