Passed
Push — develop ( 0bb377...a0f65e )
by Plexxi
06:38 queued 02:43
created

ExecutionRetryPolicyApplicator   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 121
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 121
rs 10
c 1
b 0
f 0
wmc 13

5 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 17 1
C apply_after() 0 58 9
A _is_live_action_part_of_workflow_action() 0 11 1
A _get_live_action_retry_count() 0 12 1
A _re_run_live_action() 0 18 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import functools
18
19
import eventlet
20
21
from st2common import log as logging
22
from st2common.models.db.liveaction import LiveActionDB
23
import st2common.services.action as action_services
24
from st2common.constants.action import LIVEACTION_STATUS_FAILED
25
from st2common.constants.action import LIVEACTION_STATUS_TIMED_OUT
26
from st2common.util.enum import Enum
27
from st2common.policies.base import ResourcePolicyApplicator
28
29
__all__ = [
30
    'RetryOnPolicy',
31
    'ExecutionRetryPolicyApplicator'
32
]
33
34
LOG = logging.getLogger(__name__)
35
36
VALID_RETRY_STATUSES = [
37
    LIVEACTION_STATUS_FAILED,
38
    LIVEACTION_STATUS_TIMED_OUT
39
]
40
41
42
class RetryOnPolicy(Enum):
43
    FAILURE = 'failure'  # Retry on execution failure
44
    TIMEOUT = 'timeout'  # Retry on execution timeout
45
46
47
class ExecutionRetryPolicyApplicator(ResourcePolicyApplicator):
48
    def __init__(self, policy_ref, policy_type, retry_on, max_retry_count=2, delay=0):
49
        """
50
        :param retry_on: Condition to retry the execution on (failure, timeout).
51
        :type retry_on: ``str``
52
53
        :param max_retry_count: Maximum number of times to try to retry an action.
54
        :type max_retry_count: ``int``
55
56
        :param delay: How long to wait before retrying an execution.
57
        :type delay: ``float``
58
        """
59
        super(ExecutionRetryPolicyApplicator, self).__init__(policy_ref=policy_ref,
60
                                                             policy_type=policy_type)
61
62
        self.retry_on = retry_on
63
        self.max_retry_count = max_retry_count
64
        self.delay = delay or 0
65
66
    def apply_after(self, target):
67
        target = super(ExecutionRetryPolicyApplicator, self).apply_after(target=target)
68
69
        live_action_db = target
70
71
        if self._is_live_action_part_of_workflow_action(live_action_db):
72
            LOG.warning(
73
                'Retry cannot be applied to this liveaction because it is executed under a '
74
                'workflow. Use workflow specific retry functionality where applicable. %s',
75
                live_action_db
76
            )
77
78
            return target
79
80
        retry_count = self._get_live_action_retry_count(live_action_db=live_action_db)
81
82
        extra = {'live_action_db': live_action_db, 'policy_ref': self._policy_ref,
83
                 'retry_on': self.retry_on, 'max_retry_count': self.max_retry_count,
84
                 'current_retry_count': retry_count}
85
86
        if live_action_db.status not in VALID_RETRY_STATUSES:
87
            # Currently we only support retrying on failed action
88
            LOG.debug('Liveaction not in a valid retry state, not checking retry policy',
89
                      extra=extra)
90
            return target
91
92
        if (retry_count + 1) > self.max_retry_count:
93
            LOG.info('Maximum retry count has been reached, not retrying', extra=extra)
94
            return target
95
96
        has_failed = live_action_db.status == LIVEACTION_STATUS_FAILED
97
        has_timed_out = live_action_db.status == LIVEACTION_STATUS_TIMED_OUT
98
99
        # TODO: This is not crash and restart safe, switch to using "DELAYED"
100
        # status
101
        if self.delay > 0:
102
            re_run_live_action = functools.partial(eventlet.spawn_after, self.delay,
103
                                                   self._re_run_live_action,
104
                                                   live_action_db=live_action_db)
105
        else:
106
            re_run_live_action = functools.partial(self._re_run_live_action,
107
                                                   live_action_db=live_action_db)
108
109
        if has_failed and self.retry_on == RetryOnPolicy.FAILURE:
110
            extra['failure'] = True
111
            LOG.info('Policy matched (failure), retrying action execution in %s seconds...' %
112
                     (self.delay), extra=extra)
113
            re_run_live_action()
114
            return target
115
116
        if has_timed_out and self.retry_on == RetryOnPolicy.TIMEOUT:
117
            extra['timeout'] = True
118
            LOG.info('Policy matched (timeout), retrying action execution in %s seconds...' %
119
                     (self.delay), extra=extra)
120
            re_run_live_action()
121
            return target
122
123
        return target
124
125
    def _is_live_action_part_of_workflow_action(self, live_action_db):
126
        """
127
        Retrieve parent info from context of the live action.
128
129
        :rtype: ``dict``
130
        """
131
        context = getattr(live_action_db, 'context', {})
132
        parent = context.get('parent', {})
133
        is_wf_action = (parent is not None and parent != {})
134
135
        return is_wf_action
136
137
    def _get_live_action_retry_count(self, live_action_db):
138
        """
139
        Retrieve current retry count for the provided live action.
140
141
        :rtype: ``int``
142
        """
143
        # TODO: Ideally we would store retry_count in zookeeper or similar and use locking so we
144
        # can run multiple instances of st2notififer
145
        context = getattr(live_action_db, 'context', {})
146
        retry_count = context.get('policies', {}).get('retry', {}).get('retry_count', 0)
147
148
        return retry_count
149
150
    def _re_run_live_action(self, live_action_db):
151
        retry_count = self._get_live_action_retry_count(live_action_db=live_action_db)
152
153
        # Add additional policy specific info to the context
154
        context = getattr(live_action_db, 'context', {})
155
        new_context = copy.deepcopy(context)
156
        new_context['policies'] = {}
157
        new_context['policies']['retry'] = {
158
            'applied_policy': self._policy_ref,
159
            'retry_count': (retry_count + 1),
160
            'retried_liveaction_id': str(live_action_db.id)
161
        }
162
        action_ref = live_action_db.action
163
        parameters = live_action_db.parameters
164
        new_live_action_db = LiveActionDB(action=action_ref, parameters=parameters,
165
                                          context=new_context)
166
        _, action_execution_db = action_services.request(new_live_action_db)
167
        return action_execution_db
168