Completed
Push — master ( 5da451...6b0abf )
by Manas
07:55
created

st2common.services.abandon_execution_if_incomplete()   A

Complexity

Conditions 2

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 20
rs 9.4286
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
17
# contributor license agreements.  See the NOTICE file distributed with
18
# this work for additional information regarding copyright ownership.
19
# The ASF licenses this file to You under the Apache License, Version 2.0
20
# (the "License"); you may not use this file except in compliance with
21
# the License.  You may obtain a copy of the License at
22
#
23
#     http://www.apache.org/licenses/LICENSE-2.0
24
#
25
# Unless required by applicable law or agreed to in writing, software
26
# distributed under the License is distributed on an "AS IS" BASIS,
27
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28
# See the License for the specific language governing permissions and
29
# limitations under the License.
30
31
import six
32
33
from st2common import log as logging
34
from st2common.util import reference
35
import st2common.util.action_db as action_utils
36
from st2common.constants import action as action_constants
37
from st2common.persistence.execution import ActionExecution
38
from st2common.persistence.runner import RunnerType
39
from st2common.persistence.rule import Rule
40
from st2common.persistence.trigger import TriggerType, Trigger, TriggerInstance
41
from st2common.models.api.action import RunnerTypeAPI, ActionAPI, LiveActionAPI
42
from st2common.models.api.rule import RuleAPI
43
from st2common.models.api.trigger import TriggerTypeAPI, TriggerAPI, TriggerInstanceAPI
44
from st2common.models.db.execution import ActionExecutionDB
45
46
__all__ = [
47
    'create_execution_object',
48
    'update_execution',
49
    'abandon_execution_if_incomplete',
50
    'is_execution_canceled',
51
    'AscendingSortedDescendantView',
52
    'DFSDescendantView',
53
    'get_descendants'
54
]
55
56
LOG = logging.getLogger(__name__)
57
58
SKIPPED = ['id', 'callback', 'action', 'runner_info', 'parameters']
59
60
61
def _decompose_liveaction(liveaction_db):
62
    """
63
    Splits the liveaction into an ActionExecution compatible dict.
64
    """
65
    decomposed = {'liveaction': {}}
66
    liveaction_api = vars(LiveActionAPI.from_model(liveaction_db))
67
    for k in liveaction_api.keys():
68
        if k in SKIPPED:
69
            decomposed['liveaction'][k] = liveaction_api[k]
70
        else:
71
            decomposed[k] = getattr(liveaction_db, k)
72
    return decomposed
73
74
75
def create_execution_object(liveaction, publish=True):
76
    action_db = action_utils.get_action_by_ref(liveaction.action)
77
    runner = RunnerType.get_by_name(action_db.runner_type['name'])
78
79
    attrs = {
80
        'action': vars(ActionAPI.from_model(action_db)),
81
        'parameters': liveaction['parameters'],
82
        'runner': vars(RunnerTypeAPI.from_model(runner))
83
    }
84
    attrs.update(_decompose_liveaction(liveaction))
85
86
    if 'rule' in liveaction.context:
87
        rule = reference.get_model_from_ref(Rule, liveaction.context.get('rule', {}))
88
        attrs['rule'] = vars(RuleAPI.from_model(rule))
89
90
    if 'trigger_instance' in liveaction.context:
91
        trigger_instance_id = liveaction.context.get('trigger_instance', {})
92
        trigger_instance_id = trigger_instance_id.get('id', None)
93
        trigger_instance = TriggerInstance.get_by_id(trigger_instance_id)
94
        trigger = reference.get_model_by_resource_ref(db_api=Trigger,
95
                                                      ref=trigger_instance.trigger)
96
        trigger_type = reference.get_model_by_resource_ref(db_api=TriggerType,
97
                                                           ref=trigger.type)
98
        trigger_instance = reference.get_model_from_ref(
99
            TriggerInstance, liveaction.context.get('trigger_instance', {}))
100
        attrs['trigger_instance'] = vars(TriggerInstanceAPI.from_model(trigger_instance))
101
        attrs['trigger'] = vars(TriggerAPI.from_model(trigger))
102
        attrs['trigger_type'] = vars(TriggerTypeAPI.from_model(trigger_type))
103
104
    parent = _get_parent_execution(liveaction)
105
    if parent:
106
        attrs['parent'] = str(parent.id)
107
108
    execution = ActionExecutionDB(**attrs)
109
    execution = ActionExecution.add_or_update(execution, publish=publish)
110
111
    if parent:
112
        if str(execution.id) not in parent.children:
113
            parent.children.append(str(execution.id))
114
            ActionExecution.add_or_update(parent)
115
116
    return execution
117
118
119
def _get_parent_execution(child_liveaction_db):
120
    parent_context = child_liveaction_db.context.get('parent', None)
121
122
    if parent_context:
123
        parent_id = parent_context['execution_id']
124
        try:
125
            return ActionExecution.get_by_id(parent_id)
126
        except:
127
            LOG.exception('No valid execution object found in db for id: %s' % parent_id)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
128
            return None
129
    return None
130
131
132
def update_execution(liveaction_db, publish=True):
133
    execution = ActionExecution.get(liveaction__id=str(liveaction_db.id))
134
    decomposed = _decompose_liveaction(liveaction_db)
135
    for k, v in six.iteritems(decomposed):
136
        setattr(execution, k, v)
137
    execution = ActionExecution.add_or_update(execution, publish=publish)
138
    return execution
139
140
141
def abandon_execution_if_incomplete(liveaction_id, publish=True):
142
    """
143
    Marks execution as abandoned if it is still incomplete. Abandoning an
144
    execution implies that its end state is unknown and cannot anylonger
145
    be determined. This method should only be called if the owning process
146
    is certain it can no longer determine status of an execution.
147
    """
148
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
149
    # No need to abandon and already complete action
150
    if liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES:
151
        raise ValueError('LiveAction %s already in a completed state %s.' %
152
                         (liveaction_id, liveaction_db.status))
153
    liveaction_db = action_utils.update_liveaction_status(
154
        status=action_constants.LIVEACTION_STATUS_ABANDONED,
155
        liveaction_db=liveaction_db,
156
        result={})
157
    execution_db = update_execution(liveaction_db, publish=publish)
158
    LOG.info('Marked execution %s as %s.', execution_db.id,
159
             action_constants.LIVEACTION_STATUS_ABANDONED)
160
    return execution_db
161
162
163
def is_execution_canceled(execution_id):
164
    try:
165
        execution = ActionExecution.get_by_id(execution_id)
166
        return execution.status == action_constants.LIVEACTION_STATUS_CANCELED
167
    except:
168
        return False  # XXX: What to do here?
169
170
171
class AscendingSortedDescendantView(object):
172
    def __init__(self):
173
        self._result = []
174
175
    def add(self, child):
176
        self._result.append(child)
177
178
    @property
179
    def result(self):
180
        return sorted(self._result, key=lambda execution: execution.start_timestamp)
181
182
183
class DFSDescendantView(object):
184
    def __init__(self):
185
        self._result = []
186
187
    def add(self, child):
188
        self._result.append(child)
189
190
    @property
191
    def result(self):
192
        return self._result
193
194
195
DESCENDANT_VIEWS = {
196
    'sorted': AscendingSortedDescendantView,
197
    'default': DFSDescendantView
198
}
199
200
201
def get_descendants(actionexecution_id, descendant_depth=-1, result_fmt=None):
202
    """
203
    Returns all descendant executions upto the specified descendant_depth for
204
    the supplied actionexecution_id.
205
    """
206
    descendants = DESCENDANT_VIEWS.get(result_fmt, DFSDescendantView)()
207
    children = ActionExecution.query(parent=actionexecution_id,
208
                                     **{'order_by': ['start_timestamp']})
209
    LOG.debug('Found %s children for id %s.', len(children), actionexecution_id)
210
    current_level = [(child, 1) for child in children]
211
212
    while current_level:
213
        parent, level = current_level.pop(0)
214
        parent_id = str(parent.id)
215
        descendants.add(parent)
216
        if not parent.children:
217
            continue
218
        if level != -1 and level == descendant_depth:
219
            continue
220
        children = ActionExecution.query(parent=parent_id, **{'order_by': ['start_timestamp']})
221
        LOG.debug('Found %s children for id %s.', len(children), parent_id)
222
        # prepend for DFS
223
        for idx in range(len(children)):
224
            current_level.insert(idx, (children[idx], level + 1))
225
    return descendants.result
226