Completed
Pull Request — master (#2299)
by Manas
06:53
created

st2common.services.abandon_execution_if_incomplete()   A

Complexity

Conditions 2

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 18
rs 9.4286
cc 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
17
# contributor license agreements.  See the NOTICE file distributed with
18
# this work for additional information regarding copyright ownership.
19
# The ASF licenses this file to You under the Apache License, Version 2.0
20
# (the "License"); you may not use this file except in compliance with
21
# the License.  You may obtain a copy of the License at
22
#
23
#     http://www.apache.org/licenses/LICENSE-2.0
24
#
25
# Unless required by applicable law or agreed to in writing, software
26
# distributed under the License is distributed on an "AS IS" BASIS,
27
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28
# See the License for the specific language governing permissions and
29
# limitations under the License.
30
31
import six
32
33
from st2common import log as logging
34
from st2common.util import reference
35
import st2common.util.action_db as action_utils
36
from st2common.constants import action as action_constants
37
from st2common.persistence.execution import ActionExecution
38
from st2common.persistence.runner import RunnerType
39
from st2common.persistence.rule import Rule
40
from st2common.persistence.trigger import TriggerType, Trigger, TriggerInstance
41
from st2common.models.api.action import RunnerTypeAPI, ActionAPI, LiveActionAPI
42
from st2common.models.api.rule import RuleAPI
43
from st2common.models.api.trigger import TriggerTypeAPI, TriggerAPI, TriggerInstanceAPI
44
from st2common.models.db.execution import ActionExecutionDB
45
46
__all__ = [
47
    'create_execution_object',
48
    'update_execution',
49
    'abandon_execution_if_incomplete',
50
    'is_execution_canceled',
51
    'AscendingSortedDescendantView',
52
    'DFSDescendantView',
53
    'get_descendants'
54
]
55
56
LOG = logging.getLogger(__name__)
57
58
SKIPPED = ['id', 'callback', 'action', 'runner_info']
59
60
61
def _decompose_liveaction(liveaction_db):
62
    """
63
    Splits the liveaction into an ActionExecution compatible dict.
64
    """
65
    decomposed = {'liveaction': {}}
66
    liveaction_api = vars(LiveActionAPI.from_model(liveaction_db))
67
    for k in liveaction_api.keys():
68
        if k in SKIPPED:
69
            decomposed['liveaction'][k] = liveaction_api[k]
70
        else:
71
            decomposed[k] = getattr(liveaction_db, k)
72
    return decomposed
73
74
75
def create_execution_object(liveaction, publish=True):
76
    action_db = action_utils.get_action_by_ref(liveaction.action)
77
    runner = RunnerType.get_by_name(action_db.runner_type['name'])
78
79
    attrs = {
80
        'action': vars(ActionAPI.from_model(action_db)),
81
        'runner': vars(RunnerTypeAPI.from_model(runner))
82
    }
83
    attrs.update(_decompose_liveaction(liveaction))
84
85
    if 'rule' in liveaction.context:
86
        rule = reference.get_model_from_ref(Rule, liveaction.context.get('rule', {}))
87
        attrs['rule'] = vars(RuleAPI.from_model(rule))
88
89
    if 'trigger_instance' in liveaction.context:
90
        trigger_instance_id = liveaction.context.get('trigger_instance', {})
91
        trigger_instance_id = trigger_instance_id.get('id', None)
92
        trigger_instance = TriggerInstance.get_by_id(trigger_instance_id)
93
        trigger = reference.get_model_by_resource_ref(db_api=Trigger,
94
                                                      ref=trigger_instance.trigger)
95
        trigger_type = reference.get_model_by_resource_ref(db_api=TriggerType,
96
                                                           ref=trigger.type)
97
        trigger_instance = reference.get_model_from_ref(
98
            TriggerInstance, liveaction.context.get('trigger_instance', {}))
99
        attrs['trigger_instance'] = vars(TriggerInstanceAPI.from_model(trigger_instance))
100
        attrs['trigger'] = vars(TriggerAPI.from_model(trigger))
101
        attrs['trigger_type'] = vars(TriggerTypeAPI.from_model(trigger_type))
102
103
    parent = _get_parent_execution(liveaction)
104
    if parent:
105
        attrs['parent'] = str(parent.id)
106
107
    execution = ActionExecutionDB(**attrs)
108
    execution = ActionExecution.add_or_update(execution, publish=publish)
109
110
    if parent:
111
        if str(execution.id) not in parent.children:
112
            parent.children.append(str(execution.id))
113
            ActionExecution.add_or_update(parent)
114
115
    return execution
116
117
118
def _get_parent_execution(child_liveaction_db):
119
    parent_context = child_liveaction_db.context.get('parent', None)
120
121
    if parent_context:
122
        parent_id = parent_context['execution_id']
123
        try:
124
            return ActionExecution.get_by_id(parent_id)
125
        except:
126
            LOG.exception('No valid execution object found in db for id: %s' % parent_id)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
127
            return None
128
    return None
129
130
131
def update_execution(liveaction_db, publish=True):
132
    execution = ActionExecution.get(liveaction__id=str(liveaction_db.id))
133
    decomposed = _decompose_liveaction(liveaction_db)
134
    for k, v in six.iteritems(decomposed):
135
        setattr(execution, k, v)
136
    execution = ActionExecution.add_or_update(execution, publish=publish)
137
    return execution
138
139
140
def abandon_execution_if_incomplete(liveaction_id, publish=True):
141
    """
142
    Marks execution as abandoned if it is still incomplete. Abandoning an
143
    execution implies that its end state is unknown and cannot anylonger
144
    be determined. This method should only be called if the owning process
145
    is certain it can no longer determine status of an execution.
146
    """
147
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
148
    # No need to abandon and already complete action
149
    if liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES:
150
        return
151
    liveaction_db = action_utils.update_liveaction_status(
152
        status=action_constants.LIVEACTION_STATUS_ABANDONED,
153
        liveaction_db=liveaction_db,
154
        result={})
155
    execution_db = update_execution(liveaction_db, publish=publish)
156
    LOG.info('Marked execution %s as %s.', execution_db.id,
157
             action_constants.LIVEACTION_STATUS_ABANDONED)
158
159
160
def is_execution_canceled(execution_id):
161
    try:
162
        execution = ActionExecution.get_by_id(execution_id)
163
        return execution.status == action_constants.LIVEACTION_STATUS_CANCELED
164
    except:
165
        return False  # XXX: What to do here?
166
167
168
class AscendingSortedDescendantView(object):
169
    def __init__(self):
170
        self._result = []
171
172
    def add(self, child):
173
        self._result.append(child)
174
175
    @property
176
    def result(self):
177
        return sorted(self._result, key=lambda execution: execution.start_timestamp)
178
179
180
class DFSDescendantView(object):
181
    def __init__(self):
182
        self._result = []
183
184
    def add(self, child):
185
        self._result.append(child)
186
187
    @property
188
    def result(self):
189
        return self._result
190
191
192
DESCENDANT_VIEWS = {
193
    'sorted': AscendingSortedDescendantView,
194
    'default': DFSDescendantView
195
}
196
197
198
def get_descendants(actionexecution_id, descendant_depth=-1, result_fmt=None):
199
    """
200
    Returns all descendant executions upto the specified descendant_depth for
201
    the supplied actionexecution_id.
202
    """
203
    descendants = DESCENDANT_VIEWS.get(result_fmt, DFSDescendantView)()
204
    children = ActionExecution.query(parent=actionexecution_id,
205
                                     **{'order_by': ['start_timestamp']})
206
    LOG.debug('Found %s children for id %s.', len(children), actionexecution_id)
207
    current_level = [(child, 1) for child in children]
208
209
    while current_level:
210
        parent, level = current_level.pop(0)
211
        parent_id = str(parent.id)
212
        descendants.add(parent)
213
        if not parent.children:
214
            continue
215
        if level != -1 and level == descendant_depth:
216
            continue
217
        children = ActionExecution.query(parent=parent_id, **{'order_by': ['start_timestamp']})
218
        LOG.debug('Found %s children for id %s.', len(children), parent_id)
219
        # prepend for DFS
220
        for idx in range(len(children)):
221
            current_level.insert(idx, (children[idx], level + 1))
222
    return descendants.result
223