1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
17 | # contributor license agreements. See the NOTICE file distributed with |
||
18 | # this work for additional information regarding copyright ownership. |
||
19 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
20 | # (the "License"); you may not use this file except in compliance with |
||
21 | # the License. You may obtain a copy of the License at |
||
22 | # |
||
23 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
24 | # |
||
25 | # Unless required by applicable law or agreed to in writing, software |
||
26 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
27 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
28 | # See the License for the specific language governing permissions and |
||
29 | # limitations under the License. |
||
30 | |||
31 | from __future__ import absolute_import |
||
32 | from oslo_config import cfg |
||
33 | import six |
||
34 | |||
35 | from st2common import log as logging |
||
36 | from st2common.util import date as date_utils |
||
37 | from st2common.util import reference |
||
38 | import st2common.util.action_db as action_utils |
||
39 | from st2common.constants import action as action_constants |
||
40 | from st2common.persistence.execution import ActionExecution |
||
41 | from st2common.persistence.runner import RunnerType |
||
42 | from st2common.persistence.rule import Rule |
||
43 | from st2common.persistence.trigger import TriggerType, Trigger, TriggerInstance |
||
44 | from st2common.models.api.action import RunnerTypeAPI, ActionAPI, LiveActionAPI |
||
45 | from st2common.models.api.rule import RuleAPI |
||
46 | from st2common.models.api.trigger import TriggerTypeAPI, TriggerAPI, TriggerInstanceAPI |
||
47 | from st2common.models.db.execution import ActionExecutionDB |
||
48 | from st2common.runners import utils as runners_utils |
||
49 | from six.moves import range |
||
50 | |||
51 | |||
52 | __all__ = [ |
||
53 | 'create_execution_object', |
||
54 | 'update_execution', |
||
55 | 'abandon_execution_if_incomplete', |
||
56 | 'is_execution_canceled', |
||
57 | 'AscendingSortedDescendantView', |
||
58 | 'DFSDescendantView', |
||
59 | 'get_descendants' |
||
60 | ] |
||
61 | |||
62 | LOG = logging.getLogger(__name__) |
||
63 | |||
64 | # Attributes which are stored in the "liveaction" dictionary when composing LiveActionDB object |
||
65 | # into a ActionExecution compatible dictionary. |
||
66 | # Those attributes are LiveAction specific and are therefore stored in a "liveaction" key |
||
67 | LIVEACTION_ATTRIBUTES = [ |
||
68 | 'id', |
||
69 | 'callback', |
||
70 | 'action', |
||
71 | 'action_is_workflow', |
||
72 | 'runner_info', |
||
73 | 'parameters', |
||
74 | 'notify' |
||
75 | ] |
||
76 | |||
77 | |||
78 | def _decompose_liveaction(liveaction_db): |
||
79 | """ |
||
80 | Splits the liveaction into an ActionExecution compatible dict. |
||
81 | """ |
||
82 | decomposed = {'liveaction': {}} |
||
83 | liveaction_api = vars(LiveActionAPI.from_model(liveaction_db)) |
||
84 | for k in liveaction_api.keys(): |
||
85 | if k in LIVEACTION_ATTRIBUTES: |
||
86 | decomposed['liveaction'][k] = liveaction_api[k] |
||
87 | else: |
||
88 | decomposed[k] = getattr(liveaction_db, k) |
||
89 | return decomposed |
||
90 | |||
91 | |||
92 | def _create_execution_log_entry(status): |
||
93 | """ |
||
94 | Create execution log entry object for the provided execution status. |
||
95 | """ |
||
96 | return { |
||
97 | 'timestamp': date_utils.get_datetime_utc_now(), |
||
98 | 'status': status |
||
99 | } |
||
100 | |||
101 | |||
102 | def create_execution_object(liveaction, publish=True): |
||
103 | action_db = action_utils.get_action_by_ref(liveaction.action) |
||
104 | runner = RunnerType.get_by_name(action_db.runner_type['name']) |
||
105 | |||
106 | attrs = { |
||
107 | 'action': vars(ActionAPI.from_model(action_db)), |
||
108 | 'parameters': liveaction['parameters'], |
||
109 | 'runner': vars(RunnerTypeAPI.from_model(runner)) |
||
110 | } |
||
111 | attrs.update(_decompose_liveaction(liveaction)) |
||
112 | |||
113 | if 'rule' in liveaction.context: |
||
114 | rule = reference.get_model_from_ref(Rule, liveaction.context.get('rule', {})) |
||
115 | attrs['rule'] = vars(RuleAPI.from_model(rule)) |
||
116 | |||
117 | if 'trigger_instance' in liveaction.context: |
||
118 | trigger_instance_id = liveaction.context.get('trigger_instance', {}) |
||
119 | trigger_instance_id = trigger_instance_id.get('id', None) |
||
120 | trigger_instance = TriggerInstance.get_by_id(trigger_instance_id) |
||
121 | trigger = reference.get_model_by_resource_ref(db_api=Trigger, |
||
122 | ref=trigger_instance.trigger) |
||
123 | trigger_type = reference.get_model_by_resource_ref(db_api=TriggerType, |
||
124 | ref=trigger.type) |
||
125 | trigger_instance = reference.get_model_from_ref( |
||
126 | TriggerInstance, liveaction.context.get('trigger_instance', {})) |
||
127 | attrs['trigger_instance'] = vars(TriggerInstanceAPI.from_model(trigger_instance)) |
||
128 | attrs['trigger'] = vars(TriggerAPI.from_model(trigger)) |
||
129 | attrs['trigger_type'] = vars(TriggerTypeAPI.from_model(trigger_type)) |
||
130 | |||
131 | parent = _get_parent_execution(liveaction) |
||
132 | if parent: |
||
133 | attrs['parent'] = str(parent.id) |
||
134 | |||
135 | attrs['log'] = [_create_execution_log_entry(liveaction['status'])] |
||
136 | |||
137 | execution = ActionExecutionDB(**attrs) |
||
138 | execution = ActionExecution.add_or_update(execution, publish=False) |
||
139 | |||
140 | # Update the web_url field in execution. Unfortunately, we need |
||
141 | # the execution id for constructing the URL which we only get |
||
142 | # after the model is written to disk. |
||
143 | execution.web_url = _get_web_url_for_execution(str(execution.id)) |
||
144 | execution = ActionExecution.add_or_update(execution, publish=publish) |
||
145 | |||
146 | if parent: |
||
147 | if str(execution.id) not in parent.children: |
||
148 | parent.children.append(str(execution.id)) |
||
149 | ActionExecution.add_or_update(parent) |
||
150 | |||
151 | return execution |
||
152 | |||
153 | |||
154 | def _get_parent_execution(child_liveaction_db): |
||
155 | parent_context = child_liveaction_db.context.get('parent', None) |
||
156 | |||
157 | if parent_context: |
||
158 | parent_id = parent_context['execution_id'] |
||
159 | try: |
||
160 | return ActionExecution.get_by_id(parent_id) |
||
161 | except: |
||
162 | LOG.exception('No valid execution object found in db for id: %s' % parent_id) |
||
0 ignored issues
–
show
Coding Style
Best Practice
introduced
by
Loading history...
|
|||
163 | return None |
||
164 | return None |
||
165 | |||
166 | |||
167 | def _get_web_url_for_execution(execution_id): |
||
168 | base_url = cfg.CONF.webui.webui_base_url |
||
169 | return "%s/#/history/%s/general" % (base_url, execution_id) |
||
170 | |||
171 | |||
172 | def update_execution(liveaction_db, publish=True): |
||
173 | execution = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
||
174 | decomposed = _decompose_liveaction(liveaction_db) |
||
175 | |||
176 | kw = {} |
||
177 | for k, v in six.iteritems(decomposed): |
||
178 | kw['set__' + k] = v |
||
179 | |||
180 | if liveaction_db.status != execution.status: |
||
181 | # Note: If the status changes we store this transition in the "log" attribute of action |
||
182 | # execution |
||
183 | kw['push__log'] = _create_execution_log_entry(liveaction_db.status) |
||
184 | execution = ActionExecution.update(execution, publish=publish, **kw) |
||
185 | return execution |
||
186 | |||
187 | |||
188 | def abandon_execution_if_incomplete(liveaction_id, publish=True): |
||
189 | """ |
||
190 | Marks execution as abandoned if it is still incomplete. Abandoning an |
||
191 | execution implies that its end state is unknown and cannot anylonger |
||
192 | be determined. This method should only be called if the owning process |
||
193 | is certain it can no longer determine status of an execution. |
||
194 | """ |
||
195 | liveaction_db = action_utils.get_liveaction_by_id(liveaction_id) |
||
196 | |||
197 | # No need to abandon and already complete action |
||
198 | if liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES: |
||
199 | raise ValueError('LiveAction %s already in a completed state %s.' % |
||
200 | (liveaction_id, liveaction_db.status)) |
||
201 | |||
202 | # Update status to reflect execution being abandoned. |
||
203 | liveaction_db = action_utils.update_liveaction_status( |
||
204 | status=action_constants.LIVEACTION_STATUS_ABANDONED, |
||
205 | liveaction_db=liveaction_db, |
||
206 | result={}) |
||
207 | |||
208 | execution_db = update_execution(liveaction_db, publish=publish) |
||
209 | |||
210 | LOG.info('Marked execution %s as %s.', execution_db.id, |
||
211 | action_constants.LIVEACTION_STATUS_ABANDONED) |
||
212 | |||
213 | # Invoke post run on the action to execute post run operations such as callback. |
||
214 | runners_utils.invoke_post_run(liveaction_db) |
||
215 | |||
216 | return execution_db |
||
217 | |||
218 | |||
219 | def is_execution_canceled(execution_id): |
||
220 | try: |
||
221 | execution = ActionExecution.get_by_id(execution_id) |
||
222 | return execution.status == action_constants.LIVEACTION_STATUS_CANCELED |
||
223 | except: |
||
224 | return False # XXX: What to do here? |
||
225 | |||
226 | |||
227 | def get_parent_context(liveaction_db): |
||
228 | """ |
||
229 | Returns context of the parent execution. |
||
230 | |||
231 | :return: If found the parent context else None. |
||
232 | :rtype: dict |
||
233 | """ |
||
234 | context = getattr(liveaction_db, 'context', None) |
||
235 | if not context: |
||
236 | return None |
||
237 | return context.get('parent', None) |
||
238 | |||
239 | |||
240 | class AscendingSortedDescendantView(object): |
||
241 | def __init__(self): |
||
242 | self._result = [] |
||
243 | |||
244 | def add(self, child): |
||
245 | self._result.append(child) |
||
246 | |||
247 | @property |
||
248 | def result(self): |
||
249 | return sorted(self._result, key=lambda execution: execution.start_timestamp) |
||
250 | |||
251 | |||
252 | class DFSDescendantView(object): |
||
253 | def __init__(self): |
||
254 | self._result = [] |
||
255 | |||
256 | def add(self, child): |
||
257 | self._result.append(child) |
||
258 | |||
259 | @property |
||
260 | def result(self): |
||
261 | return self._result |
||
262 | |||
263 | |||
264 | DESCENDANT_VIEWS = { |
||
265 | 'sorted': AscendingSortedDescendantView, |
||
266 | 'default': DFSDescendantView |
||
267 | } |
||
268 | |||
269 | |||
270 | def get_descendants(actionexecution_id, descendant_depth=-1, result_fmt=None): |
||
271 | """ |
||
272 | Returns all descendant executions upto the specified descendant_depth for |
||
273 | the supplied actionexecution_id. |
||
274 | """ |
||
275 | descendants = DESCENDANT_VIEWS.get(result_fmt, DFSDescendantView)() |
||
276 | children = ActionExecution.query(parent=actionexecution_id, |
||
277 | **{'order_by': ['start_timestamp']}) |
||
278 | LOG.debug('Found %s children for id %s.', len(children), actionexecution_id) |
||
279 | current_level = [(child, 1) for child in children] |
||
280 | |||
281 | while current_level: |
||
282 | parent, level = current_level.pop(0) |
||
283 | parent_id = str(parent.id) |
||
284 | descendants.add(parent) |
||
285 | if not parent.children: |
||
286 | continue |
||
287 | if level != -1 and level == descendant_depth: |
||
288 | continue |
||
289 | children = ActionExecution.query(parent=parent_id, **{'order_by': ['start_timestamp']}) |
||
290 | LOG.debug('Found %s children for id %s.', len(children), parent_id) |
||
291 | # prepend for DFS |
||
292 | for idx in range(len(children)): |
||
293 | current_level.insert(idx, (children[idx], level + 1)) |
||
294 | return descendants.result |
||
295 |