Passed
Push — develop ( f1fe9e...5c5de8 )
by
unknown
06:59 queued 03:36
created

GarbageCollectorService.run()   A

Complexity

Conditions 3

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 21
rs 9.3142
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
"""
17
Garbage collection service which deletes old data from the database.
18
"""
19
20
import signal
21
import datetime
22
import random
23
24
import eventlet
25
from eventlet.support import greenlets as greenlet
26
from oslo_config import cfg
27
28
from st2common import log as logging
29
from st2common.constants.exit_codes import SUCCESS_EXIT_CODE
30
from st2common.constants.exit_codes import FAILURE_EXIT_CODE
31
from st2common.constants.garbage_collection import DEFAULT_COLLECTION_INTERVAL
32
from st2common.constants.garbage_collection import DEFAULT_SLEEP_DELAY
33
from st2common.constants.garbage_collection import MINIMUM_TTL_DAYS
34
from st2common.constants.garbage_collection import MINIMUM_TTL_DAYS_EXECUTION_OUTPUT
35
from st2common.util import isotime
36
from st2common.util.date import get_datetime_utc_now
37
from st2common.garbage_collection.executions import purge_executions
38
from st2common.garbage_collection.executions import purge_execution_output_objects
39
from st2common.garbage_collection.trigger_instances import purge_trigger_instances
40
41
__all__ = [
42
    'GarbageCollectorService'
43
]
44
45
LOG = logging.getLogger(__name__)
46
47
48
class GarbageCollectorService(object):
49
    def __init__(self, collection_interval=DEFAULT_COLLECTION_INTERVAL,
50
                 sleep_delay=DEFAULT_SLEEP_DELAY):
51
        """
52
        :param collection_interval: How often to check database for old data and perform garbage
53
               collection.
54
        :type collection_interval: ``int``
55
56
        :param sleep_delay: How long to sleep (in seconds) between collection of different object
57
                            types.
58
        :type sleep_delay: ``int``
59
        """
60
        self._collection_interval = collection_interval
61
62
        self._action_executions_ttl = cfg.CONF.garbagecollector.action_executions_ttl
63
        self._action_executions_output_ttl = cfg.CONF.garbagecollector.action_executions_output_ttl
64
        self._trigger_instances_ttl = cfg.CONF.garbagecollector.trigger_instances_ttl
65
66
        self._validate_ttl_values()
67
68
        self._sleep_delay = sleep_delay
69
70
    def run(self):
71
        self._running = True
72
73
        self._register_signal_handlers()
74
75
        # Wait a couple of seconds before performing initial collection to prevent thundering herd
76
        # effect when restarting multiple services at the same time
77
        jitter_seconds = random.uniform(0, 3)
78
        eventlet.sleep(jitter_seconds)
79
80
        try:
81
            self._main_loop()
82
        except greenlet.GreenletExit:
83
            self._running = False
84
            return SUCCESS_EXIT_CODE
85
        except Exception as e:
86
            LOG.exception('Exception in the garbage collector: %s' % (str(e)))
87
            self._running = False
88
            return FAILURE_EXIT_CODE
89
90
        return SUCCESS_EXIT_CODE
91
92
    def _register_signal_handlers(self):
93
        signal.signal(signal.SIGUSR2, self.handle_sigusr2)
94
95
    def handle_sigusr2(self, signal_number, stack_frame):
96
        LOG.info('Forcing garbage collection...')
97
        self._perform_garbage_collection()
98
99
    def shutdown(self):
100
        self._running = False
101
102
    def _main_loop(self):
103
        while self._running:
104
            self._perform_garbage_collection()
105
106
            LOG.info('Sleeping for %s seconds before next garbage collection...' %
107
                     (self._collection_interval))
108
            eventlet.sleep(self._collection_interval)
109
110
    def _validate_ttl_values(self):
111
        """
112
        Validate that a user has supplied reasonable TTL values.
113
        """
114
        if self._action_executions_ttl and self._action_executions_ttl < MINIMUM_TTL_DAYS:
115
            raise ValueError('Minimum possible TTL for action_executions_ttl in days is %s' %
116
                             (MINIMUM_TTL_DAYS))
117
118
        if self._trigger_instances_ttl and self._trigger_instances_ttl < MINIMUM_TTL_DAYS:
119
            raise ValueError('Minimum possible TTL for trigger_instances_ttl in days is %s' %
120
                             (MINIMUM_TTL_DAYS))
121
122
        if self._action_executions_output_ttl and \
123
                self._action_executions_output_ttl < MINIMUM_TTL_DAYS_EXECUTION_OUTPUT:
124
            raise ValueError(('Minimum possible TTL for action_executions_output_ttl in days '
125
                              'is %s') % (MINIMUM_TTL_DAYS_EXECUTION_OUTPUT))
126
127
    def _perform_garbage_collection(self):
128
        LOG.info('Performing garbage collection...')
129
130
        # Note: We sleep for a bit between garbage collection of each object type to prevent busy
131
        # waiting
132
        if self._action_executions_ttl >= MINIMUM_TTL_DAYS:
133
            self._purge_action_executions()
134
            eventlet.sleep(self._sleep_delay)
135
        else:
136
            LOG.debug('Skipping garbage collection for action executions since it\'s not '
137
                      'configured')
138
139
        if self._action_executions_output_ttl >= MINIMUM_TTL_DAYS_EXECUTION_OUTPUT:
140
            self._purge_action_executions_output()
141
            eventlet.sleep(self._sleep_delay)
142
        else:
143
            LOG.debug('Skipping garbage collection for action executions output since it\'s not '
144
                      'configured')
145
146
        if self._trigger_instances_ttl >= MINIMUM_TTL_DAYS:
147
            self._purge_trigger_instances()
148
            eventlet.sleep(self._sleep_delay)
149
        else:
150
            LOG.debug('Skipping garbage collection for trigger instances since it\'s not '
151
                      'configured')
152
153
    def _purge_action_executions(self):
154
        """
155
        Purge action executions and corresponding live action, stdout and stderr object which match
156
        the criteria defined in the config.
157
        """
158
        LOG.info('Performing garbage collection for action executions and related objects')
159
160
        utc_now = get_datetime_utc_now()
161
        timestamp = (utc_now - datetime.timedelta(days=self._action_executions_ttl))
162
163
        # Another sanity check to make sure we don't delete new executions
164
        if timestamp > (utc_now - datetime.timedelta(days=MINIMUM_TTL_DAYS)):
165
            raise ValueError('Calculated timestamp would violate the minimum TTL constraint')
166
167
        timestamp_str = isotime.format(dt=timestamp)
168
        LOG.info('Deleting action executions older than: %s' % (timestamp_str))
169
170
        assert timestamp < utc_now
171
172
        try:
173
            purge_executions(logger=LOG, timestamp=timestamp)
174
        except Exception as e:
175
            LOG.exception('Failed to delete executions: %s' % (str(e)))
176
177
        return True
178
179
    def _purge_action_executions_output(self):
180
        LOG.info('Performing garbage collection for action executions output objects')
181
182
        utc_now = get_datetime_utc_now()
183
        timestamp = (utc_now - datetime.timedelta(days=self._action_executions_output_ttl))
184
185
        # Another sanity check to make sure we don't delete new objects
186
        if timestamp > (utc_now - datetime.timedelta(days=MINIMUM_TTL_DAYS_EXECUTION_OUTPUT)):
187
            raise ValueError('Calculated timestamp would violate the minimum TTL constraint')
188
189
        timestamp_str = isotime.format(dt=timestamp)
190
        LOG.info('Deleting action executions output objects older than: %s' % (timestamp_str))
191
192
        assert timestamp < utc_now
193
194
        try:
195
            purge_execution_output_objects(logger=LOG, timestamp=timestamp)
196
        except Exception as e:
197
            LOG.exception('Failed to delete execution output objects: %s' % (str(e)))
198
199
        return True
200
201
    def _purge_trigger_instances(self):
202
        """
203
        Purge trigger instances which match the criteria defined in the config.
204
        """
205
        LOG.info('Performing garbage collection for trigger instances')
206
207
        utc_now = get_datetime_utc_now()
208
        timestamp = (utc_now - datetime.timedelta(days=self._trigger_instances_ttl))
209
210
        # Another sanity check to make sure we don't delete new executions
211
        if timestamp > (utc_now - datetime.timedelta(days=MINIMUM_TTL_DAYS)):
212
            raise ValueError('Calculated timestamp would violate the minimum TTL constraint')
213
214
        timestamp_str = isotime.format(dt=timestamp)
215
        LOG.info('Deleting trigger instances older than: %s' % (timestamp_str))
216
217
        assert timestamp < utc_now
218
219
        try:
220
            purge_trigger_instances(logger=LOG, timestamp=timestamp)
221
        except Exception as e:
222
            LOG.exception('Failed to trigger instances: %s' % (str(e)))
223
224
        return True
225