variables from outer-scope are not overridden.
1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | import os |
||
17 | |||
18 | import eventlet |
||
19 | from six.moves import queue |
||
20 | |||
21 | from st2common import log as logging |
||
22 | from st2exporter.exporter.file_writer import TextFileWriter |
||
23 | from st2exporter.exporter.json_converter import JsonConverter |
||
24 | from st2common.models.db.marker import DumperMarkerDB |
||
25 | from st2common.persistence.marker import DumperMarker |
||
26 | from st2common.util import date as date_utils |
||
27 | from st2common.util import isotime |
||
28 | |||
29 | __all__ = [ |
||
30 | 'Dumper' |
||
31 | ] |
||
32 | |||
33 | ALLOWED_EXTENSIONS = ['json'] |
||
34 | |||
35 | CONVERTERS = { |
||
36 | 'json': JsonConverter |
||
37 | } |
||
38 | |||
39 | LOG = logging.getLogger(__name__) |
||
40 | |||
41 | |||
42 | class Dumper(object): |
||
43 | |||
44 | def __init__(self, queue, export_dir, file_format='json', |
||
0 ignored issues
–
show
|
|||
45 | file_prefix='st2-executions-', |
||
46 | batch_size=1000, sleep_interval=60, |
||
47 | max_files_per_sleep=5, |
||
48 | file_writer=None): |
||
49 | if not queue: |
||
50 | raise Exception('Need a queue to consume data from.') |
||
51 | |||
52 | if not export_dir: |
||
53 | raise Exception('Export dir needed to dump files to.') |
||
54 | |||
55 | self._export_dir = export_dir |
||
56 | if not os.path.exists(self._export_dir): |
||
57 | raise Exception('Dir path %s does not exist. Create one before using exporter.' % |
||
58 | self._export_dir) |
||
59 | |||
60 | self._file_format = file_format.lower() |
||
61 | if self._file_format not in ALLOWED_EXTENSIONS: |
||
62 | raise ValueError('Disallowed extension %s.' % file_format) |
||
63 | |||
64 | self._file_prefix = file_prefix |
||
65 | self._batch_size = batch_size |
||
66 | self._max_files_per_sleep = max_files_per_sleep |
||
67 | self._queue = queue |
||
68 | self._flush_thread = None |
||
69 | self._sleep_interval = sleep_interval |
||
70 | self._converter = CONVERTERS[self._file_format]() |
||
71 | self._shutdown = False |
||
72 | self._persisted_marker = None |
||
73 | |||
74 | if not file_writer: |
||
75 | self._file_writer = TextFileWriter() |
||
76 | |||
77 | def start(self, wait=False): |
||
78 | self._flush_thread = eventlet.spawn(self._flush) |
||
79 | if wait: |
||
80 | self.wait() |
||
81 | |||
82 | def wait(self): |
||
83 | self._flush_thread.wait() |
||
84 | |||
85 | def stop(self): |
||
86 | self._shutdown = True |
||
87 | return eventlet.kill(self._flush_thread) |
||
88 | |||
89 | def _get_batch(self): |
||
90 | if self._queue.empty(): |
||
91 | return None |
||
92 | |||
93 | executions_to_write = [] |
||
94 | for _ in range(self._batch_size): |
||
95 | try: |
||
96 | item = self._queue.get(block=False) |
||
97 | except queue.Empty: |
||
98 | break |
||
99 | else: |
||
100 | executions_to_write.append(item) |
||
101 | |||
102 | LOG.debug('Returning %d items in batch.', len(executions_to_write)) |
||
103 | LOG.debug('Remaining items in queue: %d', self._queue.qsize()) |
||
104 | return executions_to_write |
||
105 | |||
106 | def _flush(self): |
||
107 | while not self._shutdown: |
||
108 | while self._queue.empty(): |
||
109 | eventlet.sleep(self._sleep_interval) |
||
110 | |||
111 | try: |
||
112 | self._write_to_disk() |
||
113 | except: |
||
114 | LOG.error('Failed writing data to disk.') |
||
115 | |||
116 | def _write_to_disk(self): |
||
117 | count = 0 |
||
118 | self._create_date_folder() |
||
119 | |||
120 | for _ in range(self._max_files_per_sleep): |
||
121 | batch = self._get_batch() |
||
122 | |||
123 | if not batch: |
||
124 | return count |
||
125 | |||
126 | try: |
||
127 | self._write_batch_to_disk(batch) |
||
128 | self._update_marker(batch) |
||
129 | count += 1 |
||
130 | except: |
||
131 | LOG.exception('Writing batch to disk failed.') |
||
132 | return count |
||
133 | |||
134 | def _create_date_folder(self): |
||
135 | folder_name = self._get_date_folder() |
||
136 | folder_path = os.path.join(self._export_dir, folder_name) |
||
137 | |||
138 | if not os.path.exists(folder_path): |
||
139 | try: |
||
140 | os.makedirs(folder_path) |
||
141 | except: |
||
142 | LOG.exception('Unable to create sub-folder %s for export.', folder_name) |
||
143 | raise |
||
144 | |||
145 | def _write_batch_to_disk(self, batch): |
||
146 | doc_to_write = self._converter.convert(batch) |
||
147 | self._file_writer.write_text(doc_to_write, self._get_file_name()) |
||
148 | |||
149 | def _get_file_name(self): |
||
150 | timestring = date_utils.get_datetime_utc_now().strftime('%Y-%m-%dT%H:%M:%S.%fZ') |
||
151 | file_name = self._file_prefix + timestring + '.' + self._file_format |
||
152 | file_name = os.path.join(self._export_dir, self._get_date_folder(), file_name) |
||
153 | return file_name |
||
154 | |||
155 | def _get_date_folder(self): |
||
156 | return date_utils.get_datetime_utc_now().strftime('%Y-%m-%d') |
||
157 | |||
158 | def _update_marker(self, batch): |
||
159 | timestamps = [isotime.parse(item.end_timestamp) for item in batch] |
||
160 | new_marker = max(timestamps) |
||
161 | |||
162 | if self._persisted_marker and self._persisted_marker > new_marker: |
||
163 | LOG.warn('Older executions are being exported. Perhaps out of order messages.') |
||
164 | |||
165 | try: |
||
166 | self._write_marker_to_db(new_marker) |
||
167 | except: |
||
168 | LOG.exception('Failed persisting dumper marker to db.') |
||
169 | else: |
||
170 | self._persisted_marker = new_marker |
||
171 | |||
172 | return self._persisted_marker |
||
173 | |||
174 | def _write_marker_to_db(self, new_marker): |
||
175 | LOG.info('Updating marker in db to: %s', new_marker) |
||
176 | markers = DumperMarker.get_all() |
||
177 | |||
178 | if len(markers) > 1: |
||
179 | LOG.exception('More than one dumper marker found. Using first found one.') |
||
180 | |||
181 | marker = isotime.format(new_marker, offset=False) |
||
182 | updated_at = date_utils.get_datetime_utc_now() |
||
183 | |||
184 | if markers: |
||
185 | marker_id = markers[0]['id'] |
||
186 | else: |
||
187 | marker_id = None |
||
188 | |||
189 | marker_db = DumperMarkerDB(id=marker_id, marker=marker, updated_at=updated_at) |
||
190 | return DumperMarker.add_or_update(marker_db) |
||
191 |
It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior: