Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2common/st2common/services/executions.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
17
# contributor license agreements.  See the NOTICE file distributed with
18
# this work for additional information regarding copyright ownership.
19
# The ASF licenses this file to You under the Apache License, Version 2.0
20
# (the "License"); you may not use this file except in compliance with
21
# the License.  You may obtain a copy of the License at
22
#
23
#     http://www.apache.org/licenses/LICENSE-2.0
24
#
25
# Unless required by applicable law or agreed to in writing, software
26
# distributed under the License is distributed on an "AS IS" BASIS,
27
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28
# See the License for the specific language governing permissions and
29
# limitations under the License.
30
31
from __future__ import absolute_import
32
from oslo_config import cfg
33
import six
34
35
from st2common import log as logging
36
from st2common.util import date as date_utils
37
from st2common.util import reference
38
import st2common.util.action_db as action_utils
39
from st2common.constants import action as action_constants
40
from st2common.persistence.execution import ActionExecution
41
from st2common.persistence.runner import RunnerType
42
from st2common.persistence.rule import Rule
43
from st2common.persistence.trigger import TriggerType, Trigger, TriggerInstance
44
from st2common.models.api.action import RunnerTypeAPI, ActionAPI, LiveActionAPI
45
from st2common.models.api.rule import RuleAPI
46
from st2common.models.api.trigger import TriggerTypeAPI, TriggerAPI, TriggerInstanceAPI
47
from st2common.models.db.execution import ActionExecutionDB
48
from st2common.runners import utils as runners_utils
49
from six.moves import range
50
51
52
__all__ = [
53
    'create_execution_object',
54
    'update_execution',
55
    'abandon_execution_if_incomplete',
56
    'is_execution_canceled',
57
    'AscendingSortedDescendantView',
58
    'DFSDescendantView',
59
    'get_descendants'
60
]
61
62
LOG = logging.getLogger(__name__)
63
64
# Attributes which are stored in the "liveaction" dictionary when composing LiveActionDB object
65
# into a ActionExecution compatible dictionary.
66
# Those attributes are LiveAction specific and are therefore stored in a "liveaction" key
67
LIVEACTION_ATTRIBUTES = [
68
    'id',
69
    'callback',
70
    'action',
71
    'action_is_workflow',
72
    'runner_info',
73
    'parameters',
74
    'notify'
75
]
76
77
78
def _decompose_liveaction(liveaction_db):
79
    """
80
    Splits the liveaction into an ActionExecution compatible dict.
81
    """
82
    decomposed = {'liveaction': {}}
83
    liveaction_api = vars(LiveActionAPI.from_model(liveaction_db))
84
    for k in liveaction_api.keys():
85
        if k in LIVEACTION_ATTRIBUTES:
86
            decomposed['liveaction'][k] = liveaction_api[k]
87
        else:
88
            decomposed[k] = getattr(liveaction_db, k)
89
    return decomposed
90
91
92
def _create_execution_log_entry(status):
93
    """
94
    Create execution log entry object for the provided execution status.
95
    """
96
    return {
97
        'timestamp': date_utils.get_datetime_utc_now(),
98
        'status': status
99
    }
100
101
102
def create_execution_object(liveaction, publish=True):
103
    action_db = action_utils.get_action_by_ref(liveaction.action)
104
    runner = RunnerType.get_by_name(action_db.runner_type['name'])
105
106
    attrs = {
107
        'action': vars(ActionAPI.from_model(action_db)),
108
        'parameters': liveaction['parameters'],
109
        'runner': vars(RunnerTypeAPI.from_model(runner))
110
    }
111
    attrs.update(_decompose_liveaction(liveaction))
112
113
    if 'rule' in liveaction.context:
114
        rule = reference.get_model_from_ref(Rule, liveaction.context.get('rule', {}))
115
        attrs['rule'] = vars(RuleAPI.from_model(rule))
116
117
    if 'trigger_instance' in liveaction.context:
118
        trigger_instance_id = liveaction.context.get('trigger_instance', {})
119
        trigger_instance_id = trigger_instance_id.get('id', None)
120
        trigger_instance = TriggerInstance.get_by_id(trigger_instance_id)
121
        trigger = reference.get_model_by_resource_ref(db_api=Trigger,
122
                                                      ref=trigger_instance.trigger)
123
        trigger_type = reference.get_model_by_resource_ref(db_api=TriggerType,
124
                                                           ref=trigger.type)
125
        trigger_instance = reference.get_model_from_ref(
126
            TriggerInstance, liveaction.context.get('trigger_instance', {}))
127
        attrs['trigger_instance'] = vars(TriggerInstanceAPI.from_model(trigger_instance))
128
        attrs['trigger'] = vars(TriggerAPI.from_model(trigger))
129
        attrs['trigger_type'] = vars(TriggerTypeAPI.from_model(trigger_type))
130
131
    parent = _get_parent_execution(liveaction)
132
    if parent:
133
        attrs['parent'] = str(parent.id)
134
135
    attrs['log'] = [_create_execution_log_entry(liveaction['status'])]
136
137
    execution = ActionExecutionDB(**attrs)
138
    execution = ActionExecution.add_or_update(execution, publish=False)
139
140
    # Update the web_url field in execution. Unfortunately, we need
141
    # the execution id for constructing the URL which we only get
142
    # after the model is written to disk.
143
    execution.web_url = _get_web_url_for_execution(str(execution.id))
144
    execution = ActionExecution.add_or_update(execution, publish=publish)
145
146
    if parent:
147
        if str(execution.id) not in parent.children:
148
            parent.children.append(str(execution.id))
149
            ActionExecution.add_or_update(parent)
150
151
    return execution
152
153
154
def _get_parent_execution(child_liveaction_db):
155
    parent_context = child_liveaction_db.context.get('parent', None)
156
157
    if parent_context:
158
        parent_id = parent_context['execution_id']
159
        try:
160
            return ActionExecution.get_by_id(parent_id)
161
        except:
162
            LOG.exception('No valid execution object found in db for id: %s' % parent_id)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
163
            return None
164
    return None
165
166
167
def _get_web_url_for_execution(execution_id):
168
    base_url = cfg.CONF.webui.webui_base_url
169
    return "%s/#/history/%s/general" % (base_url, execution_id)
170
171
172
def update_execution(liveaction_db, publish=True):
173
    execution = ActionExecution.get(liveaction__id=str(liveaction_db.id))
174
    decomposed = _decompose_liveaction(liveaction_db)
175
176
    kw = {}
177
    for k, v in six.iteritems(decomposed):
178
        kw['set__' + k] = v
179
180
    if liveaction_db.status != execution.status:
181
        # Note: If the status changes we store this transition in the "log" attribute of action
182
        # execution
183
        kw['push__log'] = _create_execution_log_entry(liveaction_db.status)
184
    execution = ActionExecution.update(execution, publish=publish, **kw)
185
    return execution
186
187
188
def abandon_execution_if_incomplete(liveaction_id, publish=True):
189
    """
190
    Marks execution as abandoned if it is still incomplete. Abandoning an
191
    execution implies that its end state is unknown and cannot anylonger
192
    be determined. This method should only be called if the owning process
193
    is certain it can no longer determine status of an execution.
194
    """
195
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
196
197
    # No need to abandon and already complete action
198
    if liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES:
199
        raise ValueError('LiveAction %s already in a completed state %s.' %
200
                         (liveaction_id, liveaction_db.status))
201
202
    # Update status to reflect execution being abandoned.
203
    liveaction_db = action_utils.update_liveaction_status(
204
        status=action_constants.LIVEACTION_STATUS_ABANDONED,
205
        liveaction_db=liveaction_db,
206
        result={})
207
208
    execution_db = update_execution(liveaction_db, publish=publish)
209
210
    LOG.info('Marked execution %s as %s.', execution_db.id,
211
             action_constants.LIVEACTION_STATUS_ABANDONED)
212
213
    # Invoke post run on the action to execute post run operations such as callback.
214
    runners_utils.invoke_post_run(liveaction_db)
215
216
    return execution_db
217
218
219
def is_execution_canceled(execution_id):
220
    try:
221
        execution = ActionExecution.get_by_id(execution_id)
222
        return execution.status == action_constants.LIVEACTION_STATUS_CANCELED
223
    except:
224
        return False  # XXX: What to do here?
225
226
227
def get_parent_context(liveaction_db):
228
    """
229
    Returns context of the parent execution.
230
231
    :return: If found the parent context else None.
232
    :rtype: dict
233
    """
234
    context = getattr(liveaction_db, 'context', None)
235
    if not context:
236
        return None
237
    return context.get('parent', None)
238
239
240
class AscendingSortedDescendantView(object):
241
    def __init__(self):
242
        self._result = []
243
244
    def add(self, child):
245
        self._result.append(child)
246
247
    @property
248
    def result(self):
249
        return sorted(self._result, key=lambda execution: execution.start_timestamp)
250
251
252
class DFSDescendantView(object):
253
    def __init__(self):
254
        self._result = []
255
256
    def add(self, child):
257
        self._result.append(child)
258
259
    @property
260
    def result(self):
261
        return self._result
262
263
264
DESCENDANT_VIEWS = {
265
    'sorted': AscendingSortedDescendantView,
266
    'default': DFSDescendantView
267
}
268
269
270
def get_descendants(actionexecution_id, descendant_depth=-1, result_fmt=None):
271
    """
272
    Returns all descendant executions upto the specified descendant_depth for
273
    the supplied actionexecution_id.
274
    """
275
    descendants = DESCENDANT_VIEWS.get(result_fmt, DFSDescendantView)()
276
    children = ActionExecution.query(parent=actionexecution_id,
277
                                     **{'order_by': ['start_timestamp']})
278
    LOG.debug('Found %s children for id %s.', len(children), actionexecution_id)
279
    current_level = [(child, 1) for child in children]
280
281
    while current_level:
282
        parent, level = current_level.pop(0)
283
        parent_id = str(parent.id)
284
        descendants.add(parent)
285
        if not parent.children:
286
            continue
287
        if level != -1 and level == descendant_depth:
288
            continue
289
        children = ActionExecution.query(parent=parent_id, **{'order_by': ['start_timestamp']})
290
        LOG.debug('Found %s children for id %s.', len(children), parent_id)
291
        # prepend for DFS
292
        for idx in range(len(children)):
293
            current_level.insert(idx, (children[idx], level + 1))
294
    return descendants.result
295