Passed
Push — master ( 10b726...2f8890 )
by
unknown
03:57
created

_create_execution_log_entry()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 7
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
17
# contributor license agreements.  See the NOTICE file distributed with
18
# this work for additional information regarding copyright ownership.
19
# The ASF licenses this file to You under the Apache License, Version 2.0
20
# (the "License"); you may not use this file except in compliance with
21
# the License.  You may obtain a copy of the License at
22
#
23
#     http://www.apache.org/licenses/LICENSE-2.0
24
#
25
# Unless required by applicable law or agreed to in writing, software
26
# distributed under the License is distributed on an "AS IS" BASIS,
27
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28
# See the License for the specific language governing permissions and
29
# limitations under the License.
30
31
import six
32
33
from st2common import log as logging
34
from st2common.util import date as date_utils
35
from st2common.util import reference
36
import st2common.util.action_db as action_utils
37
from st2common.constants import action as action_constants
38
from st2common.persistence.execution import ActionExecution
39
from st2common.persistence.runner import RunnerType
40
from st2common.persistence.rule import Rule
41
from st2common.persistence.trigger import TriggerType, Trigger, TriggerInstance
42
from st2common.models.api.action import RunnerTypeAPI, ActionAPI, LiveActionAPI
43
from st2common.models.api.rule import RuleAPI
44
from st2common.models.api.trigger import TriggerTypeAPI, TriggerAPI, TriggerInstanceAPI
45
from st2common.models.db.execution import ActionExecutionDB
46
47
__all__ = [
48
    'create_execution_object',
49
    'update_execution',
50
    'abandon_execution_if_incomplete',
51
    'is_execution_canceled',
52
    'AscendingSortedDescendantView',
53
    'DFSDescendantView',
54
    'get_descendants'
55
]
56
57
LOG = logging.getLogger(__name__)
58
59
# Attributes which are stored in the "liveaction" dictionary when composing LiveActionDB object
60
# into a ActionExecution compatible dictionary.
61
# Those attributes are LiveAction specified and are therefore stored in a "liveaction" key
62
LIVEACTION_ATTRIBUTES = ['id', 'callback', 'action', 'runner_info', 'parameters', 'notify']
63
64
65
def _decompose_liveaction(liveaction_db):
66
    """
67
    Splits the liveaction into an ActionExecution compatible dict.
68
    """
69
    decomposed = {'liveaction': {}}
70
    liveaction_api = vars(LiveActionAPI.from_model(liveaction_db))
71
    for k in liveaction_api.keys():
72
        if k in LIVEACTION_ATTRIBUTES:
73
            decomposed['liveaction'][k] = liveaction_api[k]
74
        else:
75
            decomposed[k] = getattr(liveaction_db, k)
76
    return decomposed
77
78
79
def _create_execution_log_entry(status):
80
    """
81
    Create execution log entry object for the provided execution status.
82
    """
83
    return {
84
        'timestamp': date_utils.get_datetime_utc_now(),
85
        'status': status
86
    }
87
88
89
def create_execution_object(liveaction, publish=True):
90
    action_db = action_utils.get_action_by_ref(liveaction.action)
91
    runner = RunnerType.get_by_name(action_db.runner_type['name'])
92
93
    attrs = {
94
        'action': vars(ActionAPI.from_model(action_db)),
95
        'parameters': liveaction['parameters'],
96
        'runner': vars(RunnerTypeAPI.from_model(runner))
97
    }
98
    attrs.update(_decompose_liveaction(liveaction))
99
100
    if 'rule' in liveaction.context:
101
        rule = reference.get_model_from_ref(Rule, liveaction.context.get('rule', {}))
102
        attrs['rule'] = vars(RuleAPI.from_model(rule))
103
104
    if 'trigger_instance' in liveaction.context:
105
        trigger_instance_id = liveaction.context.get('trigger_instance', {})
106
        trigger_instance_id = trigger_instance_id.get('id', None)
107
        trigger_instance = TriggerInstance.get_by_id(trigger_instance_id)
108
        trigger = reference.get_model_by_resource_ref(db_api=Trigger,
109
                                                      ref=trigger_instance.trigger)
110
        trigger_type = reference.get_model_by_resource_ref(db_api=TriggerType,
111
                                                           ref=trigger.type)
112
        trigger_instance = reference.get_model_from_ref(
113
            TriggerInstance, liveaction.context.get('trigger_instance', {}))
114
        attrs['trigger_instance'] = vars(TriggerInstanceAPI.from_model(trigger_instance))
115
        attrs['trigger'] = vars(TriggerAPI.from_model(trigger))
116
        attrs['trigger_type'] = vars(TriggerTypeAPI.from_model(trigger_type))
117
118
    parent = _get_parent_execution(liveaction)
119
    if parent:
120
        attrs['parent'] = str(parent.id)
121
122
    attrs['log'] = [_create_execution_log_entry(liveaction['status'])]
123
124
    execution = ActionExecutionDB(**attrs)
125
    execution = ActionExecution.add_or_update(execution, publish=publish)
126
127
    if parent:
128
        if str(execution.id) not in parent.children:
129
            parent.children.append(str(execution.id))
130
            ActionExecution.add_or_update(parent)
131
132
    return execution
133
134
135
def _get_parent_execution(child_liveaction_db):
136
    parent_context = child_liveaction_db.context.get('parent', None)
137
138
    if parent_context:
139
        parent_id = parent_context['execution_id']
140
        try:
141
            return ActionExecution.get_by_id(parent_id)
142
        except:
143
            LOG.exception('No valid execution object found in db for id: %s' % parent_id)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
144
            return None
145
    return None
146
147
148
def update_execution(liveaction_db, publish=True):
149
    execution = ActionExecution.get(liveaction__id=str(liveaction_db.id))
150
    decomposed = _decompose_liveaction(liveaction_db)
151
152
    kw = {}
153
    for k, v in six.iteritems(decomposed):
154
        kw['set__' + k] = v
155
156
    if liveaction_db.status != execution.status:
157
        # Note: If the status changes we store this transition in the "log" attribute of action
158
        # execution
159
        kw['push__log'] = _create_execution_log_entry(liveaction_db.status)
160
    execution = ActionExecution.update(execution, publish=publish, **kw)
161
    return execution
162
163
164
def abandon_execution_if_incomplete(liveaction_id, publish=True):
165
    """
166
    Marks execution as abandoned if it is still incomplete. Abandoning an
167
    execution implies that its end state is unknown and cannot anylonger
168
    be determined. This method should only be called if the owning process
169
    is certain it can no longer determine status of an execution.
170
    """
171
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
172
    # No need to abandon and already complete action
173
    if liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES:
174
        raise ValueError('LiveAction %s already in a completed state %s.' %
175
                         (liveaction_id, liveaction_db.status))
176
    liveaction_db = action_utils.update_liveaction_status(
177
        status=action_constants.LIVEACTION_STATUS_ABANDONED,
178
        liveaction_db=liveaction_db,
179
        result={})
180
    execution_db = update_execution(liveaction_db, publish=publish)
181
    LOG.info('Marked execution %s as %s.', execution_db.id,
182
             action_constants.LIVEACTION_STATUS_ABANDONED)
183
    return execution_db
184
185
186
def is_execution_canceled(execution_id):
187
    try:
188
        execution = ActionExecution.get_by_id(execution_id)
189
        return execution.status == action_constants.LIVEACTION_STATUS_CANCELED
190
    except:
191
        return False  # XXX: What to do here?
192
193
194
def get_parent_context(liveaction_db):
195
    """
196
    Returns context of the parent execution.
197
198
    :return: If found the parent context else None.
199
    :rtype: dict
200
    """
201
    context = getattr(liveaction_db, 'context', None)
202
    if not context:
203
        return None
204
    return context.get('parent', None)
205
206
207
class AscendingSortedDescendantView(object):
208
    def __init__(self):
209
        self._result = []
210
211
    def add(self, child):
212
        self._result.append(child)
213
214
    @property
215
    def result(self):
216
        return sorted(self._result, key=lambda execution: execution.start_timestamp)
217
218
219
class DFSDescendantView(object):
220
    def __init__(self):
221
        self._result = []
222
223
    def add(self, child):
224
        self._result.append(child)
225
226
    @property
227
    def result(self):
228
        return self._result
229
230
231
DESCENDANT_VIEWS = {
232
    'sorted': AscendingSortedDescendantView,
233
    'default': DFSDescendantView
234
}
235
236
237
def get_descendants(actionexecution_id, descendant_depth=-1, result_fmt=None):
238
    """
239
    Returns all descendant executions upto the specified descendant_depth for
240
    the supplied actionexecution_id.
241
    """
242
    descendants = DESCENDANT_VIEWS.get(result_fmt, DFSDescendantView)()
243
    children = ActionExecution.query(parent=actionexecution_id,
244
                                     **{'order_by': ['start_timestamp']})
245
    LOG.debug('Found %s children for id %s.', len(children), actionexecution_id)
246
    current_level = [(child, 1) for child in children]
247
248
    while current_level:
249
        parent, level = current_level.pop(0)
250
        parent_id = str(parent.id)
251
        descendants.add(parent)
252
        if not parent.children:
253
            continue
254
        if level != -1 and level == descendant_depth:
255
            continue
256
        children = ActionExecution.query(parent=parent_id, **{'order_by': ['start_timestamp']})
257
        LOG.debug('Found %s children for id %s.', len(children), parent_id)
258
        # prepend for DFS
259
        for idx in range(len(children)):
260
            current_level.insert(idx, (children[idx], level + 1))
261
    return descendants.result
262