Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2exporter/st2exporter/exporter/dumper.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
18
import eventlet
19
from six.moves import queue
20
21
from st2common import log as logging
22
from st2exporter.exporter.file_writer import TextFileWriter
23
from st2exporter.exporter.json_converter import JsonConverter
24
from st2common.models.db.marker import DumperMarkerDB
25
from st2common.persistence.marker import DumperMarker
26
from st2common.util import date as date_utils
27
from st2common.util import isotime
28
29
__all__ = [
30
    'Dumper'
31
]
32
33
ALLOWED_EXTENSIONS = ['json']
34
35
CONVERTERS = {
36
    'json': JsonConverter
37
}
38
39
LOG = logging.getLogger(__name__)
40
41
42
class Dumper(object):
43
44
    def __init__(self, queue, export_dir, file_format='json',
0 ignored issues
show
Comprehensibility Bug introduced by
queue is re-defining a name which is already available in the outer-scope (previously defined on line 19).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
45
                 file_prefix='st2-executions-',
46
                 batch_size=1000, sleep_interval=60,
47
                 max_files_per_sleep=5,
48
                 file_writer=None):
49
        if not queue:
50
            raise Exception('Need a queue to consume data from.')
51
52
        if not export_dir:
53
            raise Exception('Export dir needed to dump files to.')
54
55
        self._export_dir = export_dir
56
        if not os.path.exists(self._export_dir):
57
            raise Exception('Dir path %s does not exist. Create one before using exporter.' %
58
                            self._export_dir)
59
60
        self._file_format = file_format.lower()
61
        if self._file_format not in ALLOWED_EXTENSIONS:
62
            raise ValueError('Disallowed extension %s.' % file_format)
63
64
        self._file_prefix = file_prefix
65
        self._batch_size = batch_size
66
        self._max_files_per_sleep = max_files_per_sleep
67
        self._queue = queue
68
        self._flush_thread = None
69
        self._sleep_interval = sleep_interval
70
        self._converter = CONVERTERS[self._file_format]()
71
        self._shutdown = False
72
        self._persisted_marker = None
73
74
        if not file_writer:
75
            self._file_writer = TextFileWriter()
76
77
    def start(self, wait=False):
78
        self._flush_thread = eventlet.spawn(self._flush)
79
        if wait:
80
            self.wait()
81
82
    def wait(self):
83
        self._flush_thread.wait()
84
85
    def stop(self):
86
        self._shutdown = True
87
        return eventlet.kill(self._flush_thread)
88
89
    def _get_batch(self):
90
        if self._queue.empty():
91
            return None
92
93
        executions_to_write = []
94
        for _ in range(self._batch_size):
95
            try:
96
                item = self._queue.get(block=False)
97
            except queue.Empty:
98
                break
99
            else:
100
                executions_to_write.append(item)
101
102
        LOG.debug('Returning %d items in batch.', len(executions_to_write))
103
        LOG.debug('Remaining items in queue: %d', self._queue.qsize())
104
        return executions_to_write
105
106
    def _flush(self):
107
        while not self._shutdown:
108
            while self._queue.empty():
109
                eventlet.sleep(self._sleep_interval)
110
111
            try:
112
                self._write_to_disk()
113
            except:
114
                LOG.error('Failed writing data to disk.')
115
116
    def _write_to_disk(self):
117
        count = 0
118
        self._create_date_folder()
119
120
        for _ in range(self._max_files_per_sleep):
121
            batch = self._get_batch()
122
123
            if not batch:
124
                return count
125
126
            try:
127
                self._write_batch_to_disk(batch)
128
                self._update_marker(batch)
129
                count += 1
130
            except:
131
                LOG.exception('Writing batch to disk failed.')
132
        return count
133
134
    def _create_date_folder(self):
135
        folder_name = self._get_date_folder()
136
        folder_path = os.path.join(self._export_dir, folder_name)
137
138
        if not os.path.exists(folder_path):
139
            try:
140
                os.makedirs(folder_path)
141
            except:
142
                LOG.exception('Unable to create sub-folder %s for export.', folder_name)
143
                raise
144
145
    def _write_batch_to_disk(self, batch):
146
        doc_to_write = self._converter.convert(batch)
147
        self._file_writer.write_text(doc_to_write, self._get_file_name())
148
149
    def _get_file_name(self):
150
        timestring = date_utils.get_datetime_utc_now().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
151
        file_name = self._file_prefix + timestring + '.' + self._file_format
152
        file_name = os.path.join(self._export_dir, self._get_date_folder(), file_name)
153
        return file_name
154
155
    def _get_date_folder(self):
156
        return date_utils.get_datetime_utc_now().strftime('%Y-%m-%d')
157
158
    def _update_marker(self, batch):
159
        timestamps = [isotime.parse(item.end_timestamp) for item in batch]
160
        new_marker = max(timestamps)
161
162
        if self._persisted_marker and self._persisted_marker > new_marker:
163
            LOG.warn('Older executions are being exported. Perhaps out of order messages.')
164
165
        try:
166
            self._write_marker_to_db(new_marker)
167
        except:
168
            LOG.exception('Failed persisting dumper marker to db.')
169
        else:
170
            self._persisted_marker = new_marker
171
172
        return self._persisted_marker
173
174
    def _write_marker_to_db(self, new_marker):
175
        LOG.info('Updating marker in db to: %s', new_marker)
176
        markers = DumperMarker.get_all()
177
178
        if len(markers) > 1:
179
            LOG.exception('More than one dumper marker found. Using first found one.')
180
181
        marker = isotime.format(new_marker, offset=False)
182
        updated_at = date_utils.get_datetime_utc_now()
183
184
        if markers:
185
            marker_id = markers[0]['id']
186
        else:
187
            marker_id = None
188
189
        marker_db = DumperMarkerDB(id=marker_id, marker=marker, updated_at=updated_at)
190
        return DumperMarker.add_or_update(marker_db)
191