GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — develop-v1.3.1 ( 969842...8fa207 )
by
unknown
05:56
created

Dumper._get_batch()   B

Complexity

Conditions 5

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 16
rs 8.5454
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import Queue
18
19
import eventlet
20
21
from st2common import log as logging
22
from st2exporter.exporter.file_writer import TextFileWriter
23
from st2exporter.exporter.json_converter import JsonConverter
24
from st2common.models.db.marker import DumperMarkerDB
25
from st2common.persistence.marker import DumperMarker
26
from st2common.util import date as date_utils
27
from st2common.util import isotime
28
29
__all__ = [
30
    'Dumper'
31
]
32
33
ALLOWED_EXTENSIONS = ['json']
34
35
CONVERTERS = {
36
    'json': JsonConverter
37
}
38
39
LOG = logging.getLogger(__name__)
40
41
42
class Dumper(object):
43
44
    def __init__(self, queue, export_dir, file_format='json',
45
                 file_prefix='st2-executions-',
46
                 batch_size=1000, sleep_interval=60,
47
                 max_files_per_sleep=5,
48
                 file_writer=None):
49
        if not queue:
50
            raise Exception('Need a queue to consume data from.')
51
52
        if not export_dir:
53
            raise Exception('Export dir needed to dump files to.')
54
55
        self._export_dir = export_dir
56
        if not os.path.exists(self._export_dir):
57
            raise Exception('Dir path %s does not exist. Create one before using exporter.' %
58
                            self._export_dir)
59
60
        self._file_format = file_format.lower()
61
        if self._file_format not in ALLOWED_EXTENSIONS:
62
            raise ValueError('Disallowed extension %s.' % file_format)
63
64
        self._file_prefix = file_prefix
65
        self._batch_size = batch_size
66
        self._max_files_per_sleep = max_files_per_sleep
67
        self._queue = queue
68
        self._flush_thread = None
69
        self._sleep_interval = sleep_interval
70
        self._converter = CONVERTERS[self._file_format]()
71
        self._shutdown = False
72
        self._persisted_marker = None
73
74
        if not file_writer:
75
            self._file_writer = TextFileWriter()
76
77
    def start(self, wait=False):
78
        self._flush_thread = eventlet.spawn(self._flush)
79
        if wait:
80
            self.wait()
81
82
    def wait(self):
83
        self._flush_thread.wait()
84
85
    def stop(self):
86
        self._shutdown = True
87
        return eventlet.kill(self._flush_thread)
88
89
    def _get_batch(self):
90
        if self._queue.empty():
91
            return None
92
93
        executions_to_write = []
94
        for _ in range(self._batch_size):
95
            try:
96
                item = self._queue.get(block=False)
97
            except Queue.Empty:
98
                break
99
            else:
100
                executions_to_write.append(item)
101
102
        LOG.debug('Returning %d items in batch.', len(executions_to_write))
103
        LOG.debug('Remaining items in queue: %d', self._queue.qsize())
104
        return executions_to_write
105
106
    def _flush(self):
107
        while not self._shutdown:
108
            while self._queue.empty():
109
                eventlet.sleep(self._sleep_interval)
110
111
            try:
112
                self._write_to_disk()
113
            except:
114
                LOG.error('Failed writing data to disk.')
115
116
    def _write_to_disk(self):
117
        count = 0
118
        self._create_date_folder()
119
120
        for _ in range(self._max_files_per_sleep):
121
            batch = self._get_batch()
122
123
            if not batch:
124
                return count
125
126
            try:
127
                self._write_batch_to_disk(batch)
128
                self._update_marker(batch)
129
                count += 1
130
            except:
131
                LOG.exception('Writing batch to disk failed.')
132
        return count
133
134
    def _create_date_folder(self):
135
        folder_name = self._get_date_folder()
136
        folder_path = os.path.join(self._export_dir, folder_name)
137
138
        if not os.path.exists(folder_path):
139
            try:
140
                os.makedirs(folder_path)
141
            except:
142
                LOG.exception('Unable to create sub-folder %s for export.', folder_name)
143
                raise
144
145
    def _write_batch_to_disk(self, batch):
146
        doc_to_write = self._converter.convert(batch)
147
        self._file_writer.write_text(doc_to_write, self._get_file_name())
148
149
    def _get_file_name(self):
150
        timestring = date_utils.get_datetime_utc_now().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
151
        file_name = self._file_prefix + timestring + '.' + self._file_format
152
        file_name = os.path.join(self._export_dir, self._get_date_folder(), file_name)
153
        return file_name
154
155
    def _get_date_folder(self):
156
        return date_utils.get_datetime_utc_now().strftime('%Y-%m-%d')
157
158
    def _update_marker(self, batch):
159
        timestamps = [isotime.parse(item.end_timestamp) for item in batch]
160
        new_marker = max(timestamps)
161
162
        if self._persisted_marker and self._persisted_marker > new_marker:
163
            LOG.warn('Older executions are being exported. Perhaps out of order messages.')
164
165
        try:
166
            self._write_marker_to_db(new_marker)
167
        except:
168
            LOG.exception('Failed persisting dumper marker to db.')
169
        else:
170
            self._persisted_marker = new_marker
171
172
        return self._persisted_marker
173
174
    def _write_marker_to_db(self, new_marker):
175
        LOG.info('Updating marker in db to: %s', new_marker)
176
        markers = DumperMarker.get_all()
177
178
        if len(markers) > 1:
179
            LOG.exception('More than one dumper marker found. Using first found one.')
180
181
        marker = isotime.format(new_marker, offset=False)
182
        updated_at = date_utils.get_datetime_utc_now()
183
184
        if markers:
185
            marker_id = markers[0]['id']
186
        else:
187
            marker_id = None
188
189
        marker_db = DumperMarkerDB(id=marker_id, marker=marker, updated_at=updated_at)
190
        return DumperMarker.add_or_update(marker_db)
191