Completed
Pull Request — master (#2299)
by Manas
05:57
created

st2common.services.abandon_execution_if_incomplete()   A

Complexity

Conditions 2

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 18
rs 9.4286
cc 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
17
# contributor license agreements.  See the NOTICE file distributed with
18
# this work for additional information regarding copyright ownership.
19
# The ASF licenses this file to You under the Apache License, Version 2.0
20
# (the "License"); you may not use this file except in compliance with
21
# the License.  You may obtain a copy of the License at
22
#
23
#     http://www.apache.org/licenses/LICENSE-2.0
24
#
25
# Unless required by applicable law or agreed to in writing, software
26
# distributed under the License is distributed on an "AS IS" BASIS,
27
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28
# See the License for the specific language governing permissions and
29
# limitations under the License.
30
31
import six
32
33
from st2common import log as logging
34
from st2common.util import reference
35
import st2common.util.action_db as action_utils
36
from st2common.constants import action as action_constants
37
from st2common.persistence.execution import ActionExecution
38
from st2common.persistence.runner import RunnerType
39
from st2common.persistence.rule import Rule
40
from st2common.persistence.trigger import TriggerType, Trigger, TriggerInstance
41
from st2common.models.api.action import RunnerTypeAPI, ActionAPI, LiveActionAPI
42
from st2common.models.api.rule import RuleAPI
43
from st2common.models.api.trigger import TriggerTypeAPI, TriggerAPI, TriggerInstanceAPI
44
from st2common.models.db.execution import ActionExecutionDB
45
46
__all__ = [
47
    'create_execution_object',
48
    'update_execution',
49
    'abandon_execution_if_incomplete',
50
    'is_execution_canceled',
51
    'AscendingSortedDescendantView',
52
    'DFSDescendantView',
53
    'get_descendants'
54
]
55
56
LOG = logging.getLogger(__name__)
57
58
SKIPPED = ['id', 'callback', 'action', 'runner_info']
59
60
61
def _decompose_liveaction(liveaction_db):
62
    """
63
    Splits the liveaction into an ActionExecution compatible dict.
64
    """
65
    decomposed = {'liveaction': {}}
66
    liveaction_api = vars(LiveActionAPI.from_model(liveaction_db))
67
    for k in liveaction_api.keys():
68
        if k in SKIPPED:
69
            decomposed['liveaction'][k] = liveaction_api[k]
70
        else:
71
            decomposed[k] = getattr(liveaction_db, k)
72
    return decomposed
73
74
75
def create_execution_object(liveaction, publish=True):
76
    action_db = action_utils.get_action_by_ref(liveaction.action)
77
    runner = RunnerType.get_by_name(action_db.runner_type['name'])
78
79
    attrs = {
80
        'action': vars(ActionAPI.from_model(action_db)),
81
        'runner': vars(RunnerTypeAPI.from_model(runner))
82
    }
83
    attrs.update(_decompose_liveaction(liveaction))
84
85
    if 'rule' in liveaction.context:
86
        rule = reference.get_model_from_ref(Rule, liveaction.context.get('rule', {}))
87
        attrs['rule'] = vars(RuleAPI.from_model(rule))
88
89
    if 'trigger_instance' in liveaction.context:
90
        trigger_instance_id = liveaction.context.get('trigger_instance', {})
91
        trigger_instance_id = trigger_instance_id.get('id', None)
92
        trigger_instance = TriggerInstance.get_by_id(trigger_instance_id)
93
        trigger = reference.get_model_by_resource_ref(db_api=Trigger,
94
                                                      ref=trigger_instance.trigger)
95
        trigger_type = reference.get_model_by_resource_ref(db_api=TriggerType,
96
                                                           ref=trigger.type)
97
        trigger_instance = reference.get_model_from_ref(
98
            TriggerInstance, liveaction.context.get('trigger_instance', {}))
99
        attrs['trigger_instance'] = vars(TriggerInstanceAPI.from_model(trigger_instance))
100
        attrs['trigger'] = vars(TriggerAPI.from_model(trigger))
101
        attrs['trigger_type'] = vars(TriggerTypeAPI.from_model(trigger_type))
102
103
    parent = _get_parent_execution(liveaction)
104
    if parent:
105
        attrs['parent'] = str(parent.id)
106
107
    execution = ActionExecutionDB(**attrs)
108
    execution = ActionExecution.add_or_update(execution, publish=publish)
109
110
    if parent:
111
        if str(execution.id) not in parent.children:
112
            parent.children.append(str(execution.id))
113
            ActionExecution.add_or_update(parent)
114
115
    return execution
116
117
118
def _get_parent_execution(child_liveaction_db):
119
    parent_context = child_liveaction_db.context.get('parent', None)
120
121
    if parent_context:
122
        parent_id = parent_context['execution_id']
123
        try:
124
            return ActionExecution.get_by_id(parent_id)
125
        except:
126
            LOG.exception('No valid execution object found in db for id: %s' % parent_id)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
127
            return None
128
    return None
129
130
131
def update_execution(liveaction_db, publish=True):
132
    execution = ActionExecution.get(liveaction__id=str(liveaction_db.id))
133
    decomposed = _decompose_liveaction(liveaction_db)
134
    for k, v in six.iteritems(decomposed):
135
        setattr(execution, k, v)
136
    execution = ActionExecution.add_or_update(execution, publish=publish)
137
    return execution
138
139
140
def abandon_execution_if_incomplete(liveaction_id, publish=True):
141
    """
142
    Marks execution as abandoned if it is still incomplete. Abandoning an
143
    execution implies that its end state is unknown and cannot anylonger
144
    be determined. This method should only be called if the owning process
145
    is certain it can no longer determine status of an execution.
146
    """
147
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
148
    # No need to abandon and already complete action
149
    if liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES:
150
        return
151
    liveaction_db = action_utils.update_liveaction_status(
152
                status=action_constants.LIVEACTION_STATUS_ABANDONED,
153
                liveaction_db=liveaction_db,
154
                result={})
155
    execution_db = update_execution(liveaction_db, publish=publish)
156
    LOG.info('Marked execution %s as %s.', execution_db.id,
157
             action_constants.LIVEACTION_STATUS_ABANDONED)
158
159
def is_execution_canceled(execution_id):
160
    try:
161
        execution = ActionExecution.get_by_id(execution_id)
162
        return execution.status == action_constants.LIVEACTION_STATUS_CANCELED
163
    except:
164
        return False  # XXX: What to do here?
165
166
167
class AscendingSortedDescendantView(object):
168
    def __init__(self):
169
        self._result = []
170
171
    def add(self, child):
172
        self._result.append(child)
173
174
    @property
175
    def result(self):
176
        return sorted(self._result, key=lambda execution: execution.start_timestamp)
177
178
179
class DFSDescendantView(object):
180
    def __init__(self):
181
        self._result = []
182
183
    def add(self, child):
184
        self._result.append(child)
185
186
    @property
187
    def result(self):
188
        return self._result
189
190
191
DESCENDANT_VIEWS = {
192
    'sorted': AscendingSortedDescendantView,
193
    'default': DFSDescendantView
194
}
195
196
197
def get_descendants(actionexecution_id, descendant_depth=-1, result_fmt=None):
198
    """
199
    Returns all descendant executions upto the specified descendant_depth for
200
    the supplied actionexecution_id.
201
    """
202
    descendants = DESCENDANT_VIEWS.get(result_fmt, DFSDescendantView)()
203
    children = ActionExecution.query(parent=actionexecution_id,
204
                                     **{'order_by': ['start_timestamp']})
205
    LOG.debug('Found %s children for id %s.', len(children), actionexecution_id)
206
    current_level = [(child, 1) for child in children]
207
208
    while current_level:
209
        parent, level = current_level.pop(0)
210
        parent_id = str(parent.id)
211
        descendants.add(parent)
212
        if not parent.children:
213
            continue
214
        if level != -1 and level == descendant_depth:
215
            continue
216
        children = ActionExecution.query(parent=parent_id, **{'order_by': ['start_timestamp']})
217
        LOG.debug('Found %s children for id %s.', len(children), parent_id)
218
        # prepend for DFS
219
        for idx in range(len(children)):
220
            current_level.insert(idx, (children[idx], level + 1))
221
    return descendants.result
222