Passed
Push — master ( f1fe9e...5c5de8 )
by
unknown
03:44
created

purge_execution_output_objects()   B

Complexity

Conditions 6

Size

Total Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 6
c 2
b 0
f 0
dl 0
loc 33
rs 7.5384
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
"""
17
Module with utility functions for purging old action executions and
18
corresponding live action objects.
19
"""
20
21
import copy
22
23
from mongoengine.errors import InvalidQueryError
24
25
from st2common.constants import action as action_constants
26
from st2common.persistence.liveaction import LiveAction
27
from st2common.persistence.execution import ActionExecution
28
from st2common.persistence.execution import ActionExecutionOutput
29
30
__all__ = [
31
    'purge_executions',
32
    'purge_execution_output_objects'
33
]
34
35
DONE_STATES = [action_constants.LIVEACTION_STATUS_SUCCEEDED,
36
               action_constants.LIVEACTION_STATUS_FAILED,
37
               action_constants.LIVEACTION_STATUS_TIMED_OUT,
38
               action_constants.LIVEACTION_STATUS_CANCELED]
39
40
41
def purge_executions(logger, timestamp, action_ref=None, purge_incomplete=False):
42
    """
43
    Purge action executions and corresponding live action, execution output objects.
44
45
    :param timestamp: Exections older than this timestamp will be deleted.
46
    :type timestamp: ``datetime.datetime
47
48
    :param action_ref: Only delete executions for the provided actions.
49
    :type action_ref: ``str``
50
51
    :param purge_incomplete: True to also delete executions which are not in a done state.
52
    :type purge_incomplete: ``bool``
53
    """
54
    if not timestamp:
55
        raise ValueError('Specify a valid timestamp to purge.')
56
57
    logger.info('Purging executions older than timestamp: %s' %
58
                timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
59
60
    filters = {}
61
62
    if purge_incomplete:
63
        filters['start_timestamp__lt'] = timestamp
64
    else:
65
        filters['end_timestamp__lt'] = timestamp
66
        filters['start_timestamp__lt'] = timestamp
67
        filters['status'] = {'$in': DONE_STATES}
68
69
    exec_filters = copy.copy(filters)
70
    if action_ref:
71
        exec_filters['action__ref'] = action_ref
72
73
    liveaction_filters = copy.deepcopy(filters)
74
    if action_ref:
75
        liveaction_filters['action'] = action_ref
76
77
    to_delete_execution_dbs = []
78
79
    # 1. Delete ActionExecutionDB objects
80
    try:
81
        # Note: We call list() on the query set object because it's lazyily evaluated otherwise
82
        to_delete_execution_dbs = list(ActionExecution.query(**exec_filters))
83
        deleted_count = ActionExecution.delete_by_query(**exec_filters)
84
    except InvalidQueryError as e:
85
        msg = ('Bad query (%s) used to delete execution instances: %s'
86
               'Please contact support.' % (exec_filters, str(e)))
87
        raise InvalidQueryError(msg)
88
    except:
89
        logger.exception('Deletion of execution models failed for query with filters: %s.',
90
                         exec_filters)
91
    else:
92
        logger.info('Deleted %s action execution objects' % (deleted_count))
93
94
    # 2. Delete LiveActionDB objects
95
    try:
96
        deleted_count = LiveAction.delete_by_query(**liveaction_filters)
97
    except InvalidQueryError as e:
98
        msg = ('Bad query (%s) used to delete liveaction instances: %s'
99
               'Please contact support.' % (liveaction_filters, str(e)))
100
        raise InvalidQueryError(msg)
101
    except:
102
        logger.exception('Deletion of liveaction models failed for query with filters: %s.',
103
                         liveaction_filters)
104
    else:
105
        logger.info('Deleted %s liveaction objects' % (deleted_count))
106
107
    # 3. Delete ActionExecutionOutputDB objects
108
    to_delete_exection_ids = [str(execution_db.id) for execution_db in to_delete_execution_dbs]
109
    output_dbs_filters = {}
110
    output_dbs_filters['execution_id'] = {'$in': to_delete_exection_ids}
111
112
    try:
113
        deleted_count = ActionExecutionOutput.delete_by_query(**output_dbs_filters)
114
    except InvalidQueryError as e:
115
        msg = ('Bad query (%s) used to delete execution output instances: %s'
116
               'Please contact support.' % (output_dbs_filters, str(e)))
117
        raise InvalidQueryError(msg)
118
    except:
119
        logger.exception('Deletion of execution output models failed for query with filters: %s.',
120
                         output_dbs_filters)
121
    else:
122
        logger.info('Deleted %s execution output objects' % (deleted_count))
123
124
    zombie_execution_instances = len(ActionExecution.query(**exec_filters))
125
    zombie_liveaction_instances = len(LiveAction.query(**liveaction_filters))
126
127
    if (zombie_execution_instances > 0) or (zombie_liveaction_instances > 0):
128
        logger.error('Zombie execution instances left: %d.', zombie_execution_instances)
129
        logger.error('Zombie liveaction instances left: %s.', zombie_liveaction_instances)
130
131
    # Print stats
132
    logger.info('All execution models older than timestamp %s were deleted.', timestamp)
133
134
135
def purge_execution_output_objects(logger, timestamp, action_ref=None):
136
    """
137
    Purge action executions output objects.
138
139
    :param timestamp: Objects older than this timestamp will be deleted.
140
    :type timestamp: ``datetime.datetime
141
142
    :param action_ref: Only delete objects for the provided actions.
143
    :type action_ref: ``str``
144
    """
145
    if not timestamp:
146
        raise ValueError('Specify a valid timestamp to purge.')
147
148
    logger.info('Purging action execution output objects older than timestamp: %s' %
149
                timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
150
151
    filters = {}
152
    filters['timestamp__lt'] = timestamp
153
154
    if action_ref:
155
        filters['action_ref'] = action_ref
156
157
    try:
158
        deleted_count = ActionExecutionOutput.delete_by_query(**filters)
159
    except InvalidQueryError as e:
160
        msg = ('Bad query (%s) used to delete execution output instances: %s'
161
               'Please contact support.' % (filters, str(e)))
162
        raise InvalidQueryError(msg)
163
    except:
164
        logger.exception('Deletion of execution output models failed for query with filters: %s.',
165
                         filters)
166
    else:
167
        logger.info('Deleted %s execution output objects' % (deleted_count))
168