1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
17 | # contributor license agreements. See the NOTICE file distributed with |
||
18 | # this work for additional information regarding copyright ownership. |
||
19 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
20 | # (the "License"); you may not use this file except in compliance with |
||
21 | # the License. You may obtain a copy of the License at |
||
22 | # |
||
23 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
24 | # |
||
25 | # Unless required by applicable law or agreed to in writing, software |
||
26 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
27 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
28 | # See the License for the specific language governing permissions and |
||
29 | # limitations under the License. |
||
30 | |||
31 | from oslo_config import cfg |
||
32 | import six |
||
33 | |||
34 | from st2common import log as logging |
||
35 | from st2common.util import date as date_utils |
||
36 | from st2common.util import reference |
||
37 | import st2common.util.action_db as action_utils |
||
38 | from st2common.constants import action as action_constants |
||
39 | from st2common.persistence.execution import ActionExecution |
||
40 | from st2common.persistence.runner import RunnerType |
||
41 | from st2common.persistence.rule import Rule |
||
42 | from st2common.persistence.trigger import TriggerType, Trigger, TriggerInstance |
||
43 | from st2common.models.api.action import RunnerTypeAPI, ActionAPI, LiveActionAPI |
||
44 | from st2common.models.api.rule import RuleAPI |
||
45 | from st2common.models.api.trigger import TriggerTypeAPI, TriggerAPI, TriggerInstanceAPI |
||
46 | from st2common.models.db.execution import ActionExecutionDB |
||
47 | from st2common.runners import utils as runners_utils |
||
48 | |||
49 | |||
50 | __all__ = [ |
||
51 | 'create_execution_object', |
||
52 | 'update_execution', |
||
53 | 'abandon_execution_if_incomplete', |
||
54 | 'is_execution_canceled', |
||
55 | 'AscendingSortedDescendantView', |
||
56 | 'DFSDescendantView', |
||
57 | 'get_descendants' |
||
58 | ] |
||
59 | |||
60 | LOG = logging.getLogger(__name__) |
||
61 | |||
62 | # Attributes which are stored in the "liveaction" dictionary when composing LiveActionDB object |
||
63 | # into a ActionExecution compatible dictionary. |
||
64 | # Those attributes are LiveAction specific and are therefore stored in a "liveaction" key |
||
65 | LIVEACTION_ATTRIBUTES = [ |
||
66 | 'id', |
||
67 | 'callback', |
||
68 | 'action', |
||
69 | 'action_is_workflow', |
||
70 | 'runner_info', |
||
71 | 'parameters', |
||
72 | 'notify' |
||
73 | ] |
||
74 | |||
75 | |||
76 | def _decompose_liveaction(liveaction_db): |
||
77 | """ |
||
78 | Splits the liveaction into an ActionExecution compatible dict. |
||
79 | """ |
||
80 | decomposed = {'liveaction': {}} |
||
81 | liveaction_api = vars(LiveActionAPI.from_model(liveaction_db)) |
||
82 | for k in liveaction_api.keys(): |
||
83 | if k in LIVEACTION_ATTRIBUTES: |
||
84 | decomposed['liveaction'][k] = liveaction_api[k] |
||
85 | else: |
||
86 | decomposed[k] = getattr(liveaction_db, k) |
||
87 | return decomposed |
||
88 | |||
89 | |||
90 | def _create_execution_log_entry(status): |
||
91 | """ |
||
92 | Create execution log entry object for the provided execution status. |
||
93 | """ |
||
94 | return { |
||
95 | 'timestamp': date_utils.get_datetime_utc_now(), |
||
96 | 'status': status |
||
97 | } |
||
98 | |||
99 | |||
100 | def create_execution_object(liveaction, publish=True): |
||
101 | action_db = action_utils.get_action_by_ref(liveaction.action) |
||
102 | runner = RunnerType.get_by_name(action_db.runner_type['name']) |
||
103 | |||
104 | attrs = { |
||
105 | 'action': vars(ActionAPI.from_model(action_db)), |
||
106 | 'parameters': liveaction['parameters'], |
||
107 | 'runner': vars(RunnerTypeAPI.from_model(runner)) |
||
108 | } |
||
109 | attrs.update(_decompose_liveaction(liveaction)) |
||
110 | |||
111 | if 'rule' in liveaction.context: |
||
112 | rule = reference.get_model_from_ref(Rule, liveaction.context.get('rule', {})) |
||
113 | attrs['rule'] = vars(RuleAPI.from_model(rule)) |
||
114 | |||
115 | if 'trigger_instance' in liveaction.context: |
||
116 | trigger_instance_id = liveaction.context.get('trigger_instance', {}) |
||
117 | trigger_instance_id = trigger_instance_id.get('id', None) |
||
118 | trigger_instance = TriggerInstance.get_by_id(trigger_instance_id) |
||
119 | trigger = reference.get_model_by_resource_ref(db_api=Trigger, |
||
120 | ref=trigger_instance.trigger) |
||
121 | trigger_type = reference.get_model_by_resource_ref(db_api=TriggerType, |
||
122 | ref=trigger.type) |
||
123 | trigger_instance = reference.get_model_from_ref( |
||
124 | TriggerInstance, liveaction.context.get('trigger_instance', {})) |
||
125 | attrs['trigger_instance'] = vars(TriggerInstanceAPI.from_model(trigger_instance)) |
||
126 | attrs['trigger'] = vars(TriggerAPI.from_model(trigger)) |
||
127 | attrs['trigger_type'] = vars(TriggerTypeAPI.from_model(trigger_type)) |
||
128 | |||
129 | parent = _get_parent_execution(liveaction) |
||
130 | if parent: |
||
131 | attrs['parent'] = str(parent.id) |
||
132 | |||
133 | attrs['log'] = [_create_execution_log_entry(liveaction['status'])] |
||
134 | |||
135 | execution = ActionExecutionDB(**attrs) |
||
136 | execution = ActionExecution.add_or_update(execution, publish=False) |
||
137 | |||
138 | # Update the web_url field in execution. Unfortunately, we need |
||
139 | # the execution id for constructing the URL which we only get |
||
140 | # after the model is written to disk. |
||
141 | execution.web_url = _get_web_url_for_execution(str(execution.id)) |
||
142 | execution = ActionExecution.add_or_update(execution, publish=publish) |
||
143 | |||
144 | if parent: |
||
145 | if str(execution.id) not in parent.children: |
||
146 | parent.children.append(str(execution.id)) |
||
147 | ActionExecution.add_or_update(parent) |
||
148 | |||
149 | return execution |
||
150 | |||
151 | |||
152 | def _get_parent_execution(child_liveaction_db): |
||
153 | parent_context = child_liveaction_db.context.get('parent', None) |
||
154 | |||
155 | if parent_context: |
||
156 | parent_id = parent_context['execution_id'] |
||
157 | try: |
||
158 | return ActionExecution.get_by_id(parent_id) |
||
159 | except: |
||
160 | LOG.exception('No valid execution object found in db for id: %s' % parent_id) |
||
0 ignored issues
–
show
Coding Style
Best Practice
introduced
by
![]() |
|||
161 | return None |
||
162 | return None |
||
163 | |||
164 | |||
165 | def _get_web_url_for_execution(execution_id): |
||
166 | base_url = cfg.CONF.webui.webui_base_url |
||
167 | return "%s/#/history/%s/general" % (base_url, execution_id) |
||
168 | |||
169 | |||
170 | def update_execution(liveaction_db, publish=True): |
||
171 | execution = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
||
172 | decomposed = _decompose_liveaction(liveaction_db) |
||
173 | |||
174 | kw = {} |
||
175 | for k, v in six.iteritems(decomposed): |
||
176 | kw['set__' + k] = v |
||
177 | |||
178 | if liveaction_db.status != execution.status: |
||
179 | # Note: If the status changes we store this transition in the "log" attribute of action |
||
180 | # execution |
||
181 | kw['push__log'] = _create_execution_log_entry(liveaction_db.status) |
||
182 | execution = ActionExecution.update(execution, publish=publish, **kw) |
||
183 | return execution |
||
184 | |||
185 | |||
186 | def abandon_execution_if_incomplete(liveaction_id, publish=True): |
||
187 | """ |
||
188 | Marks execution as abandoned if it is still incomplete. Abandoning an |
||
189 | execution implies that its end state is unknown and cannot anylonger |
||
190 | be determined. This method should only be called if the owning process |
||
191 | is certain it can no longer determine status of an execution. |
||
192 | """ |
||
193 | liveaction_db = action_utils.get_liveaction_by_id(liveaction_id) |
||
194 | |||
195 | # No need to abandon and already complete action |
||
196 | if liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES: |
||
197 | raise ValueError('LiveAction %s already in a completed state %s.' % |
||
198 | (liveaction_id, liveaction_db.status)) |
||
199 | |||
200 | # Update status to reflect execution being abandoned. |
||
201 | liveaction_db = action_utils.update_liveaction_status( |
||
202 | status=action_constants.LIVEACTION_STATUS_ABANDONED, |
||
203 | liveaction_db=liveaction_db, |
||
204 | result={}) |
||
205 | |||
206 | execution_db = update_execution(liveaction_db, publish=publish) |
||
207 | |||
208 | LOG.info('Marked execution %s as %s.', execution_db.id, |
||
209 | action_constants.LIVEACTION_STATUS_ABANDONED) |
||
210 | |||
211 | # Invoke post run on the action to execute post run operations such as callback. |
||
212 | runners_utils.invoke_post_run(liveaction_db) |
||
213 | |||
214 | return execution_db |
||
215 | |||
216 | |||
217 | def is_execution_canceled(execution_id): |
||
218 | try: |
||
219 | execution = ActionExecution.get_by_id(execution_id) |
||
220 | return execution.status == action_constants.LIVEACTION_STATUS_CANCELED |
||
221 | except: |
||
222 | return False # XXX: What to do here? |
||
223 | |||
224 | |||
225 | def get_parent_context(liveaction_db): |
||
226 | """ |
||
227 | Returns context of the parent execution. |
||
228 | |||
229 | :return: If found the parent context else None. |
||
230 | :rtype: dict |
||
231 | """ |
||
232 | context = getattr(liveaction_db, 'context', None) |
||
233 | if not context: |
||
234 | return None |
||
235 | return context.get('parent', None) |
||
236 | |||
237 | |||
238 | class AscendingSortedDescendantView(object): |
||
239 | def __init__(self): |
||
240 | self._result = [] |
||
241 | |||
242 | def add(self, child): |
||
243 | self._result.append(child) |
||
244 | |||
245 | @property |
||
246 | def result(self): |
||
247 | return sorted(self._result, key=lambda execution: execution.start_timestamp) |
||
248 | |||
249 | |||
250 | class DFSDescendantView(object): |
||
251 | def __init__(self): |
||
252 | self._result = [] |
||
253 | |||
254 | def add(self, child): |
||
255 | self._result.append(child) |
||
256 | |||
257 | @property |
||
258 | def result(self): |
||
259 | return self._result |
||
260 | |||
261 | |||
262 | DESCENDANT_VIEWS = { |
||
263 | 'sorted': AscendingSortedDescendantView, |
||
264 | 'default': DFSDescendantView |
||
265 | } |
||
266 | |||
267 | |||
268 | def get_descendants(actionexecution_id, descendant_depth=-1, result_fmt=None): |
||
269 | """ |
||
270 | Returns all descendant executions upto the specified descendant_depth for |
||
271 | the supplied actionexecution_id. |
||
272 | """ |
||
273 | descendants = DESCENDANT_VIEWS.get(result_fmt, DFSDescendantView)() |
||
274 | children = ActionExecution.query(parent=actionexecution_id, |
||
275 | **{'order_by': ['start_timestamp']}) |
||
276 | LOG.debug('Found %s children for id %s.', len(children), actionexecution_id) |
||
277 | current_level = [(child, 1) for child in children] |
||
278 | |||
279 | while current_level: |
||
280 | parent, level = current_level.pop(0) |
||
281 | parent_id = str(parent.id) |
||
282 | descendants.add(parent) |
||
283 | if not parent.children: |
||
284 | continue |
||
285 | if level != -1 and level == descendant_depth: |
||
286 | continue |
||
287 | children = ActionExecution.query(parent=parent_id, **{'order_by': ['start_timestamp']}) |
||
288 | LOG.debug('Found %s children for id %s.', len(children), parent_id) |
||
289 | # prepend for DFS |
||
290 | for idx in range(len(children)): |
||
291 | current_level.insert(idx, (children[idx], level + 1)) |
||
292 | return descendants.result |
||
293 |