1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | from __future__ import absolute_import |
||
17 | |||
18 | import kombu |
||
19 | |||
20 | from orquesta import events |
||
21 | from orquesta import states |
||
22 | |||
23 | from st2common.constants import action as ac_const |
||
24 | from st2common import log as logging |
||
25 | from st2common.models.db import execution as ex_db_models |
||
26 | from st2common.models.db import workflow as wf_db_models |
||
27 | from st2common.persistence import liveaction as lv_db_access |
||
28 | from st2common.persistence import workflow as wf_db_access |
||
29 | from st2common.services import policies as pc_svc |
||
30 | from st2common.services import workflows as wf_svc |
||
31 | from st2common.transport import consumers |
||
32 | from st2common.transport import queues |
||
33 | from st2common.transport import utils as txpt_utils |
||
34 | from st2common.metrics.base import CounterWithTimer |
||
35 | |||
36 | |||
37 | LOG = logging.getLogger(__name__) |
||
38 | |||
39 | |||
40 | WORKFLOW_EXECUTION_QUEUES = [ |
||
41 | queues.WORKFLOW_EXECUTION_WORK_QUEUE, |
||
42 | queues.WORKFLOW_EXECUTION_RESUME_QUEUE, |
||
43 | queues.WORKFLOW_ACTION_EXECUTION_UPDATE_QUEUE |
||
44 | ] |
||
45 | |||
46 | |||
47 | class WorkflowExecutionHandler(consumers.VariableMessageHandler): |
||
48 | |||
49 | def __init__(self, connection, queues): |
||
0 ignored issues
–
show
|
|||
50 | super(WorkflowExecutionHandler, self).__init__(connection, queues) |
||
51 | |||
52 | def handle_workflow_execution_with_instrumentation(wf_ex_db): |
||
53 | with CounterWithTimer(key='orquesta.workflow.executions'): |
||
54 | return self.handle_workflow_execution(wf_ex_db=wf_ex_db) |
||
55 | |||
56 | def handle_action_execution_with_instrumentation(ac_ex_db): |
||
57 | with CounterWithTimer(key='orquesta.action.executions'): |
||
58 | return self.handle_action_execution(ac_ex_db=ac_ex_db) |
||
59 | |||
60 | self.message_types = { |
||
61 | wf_db_models.WorkflowExecutionDB: handle_workflow_execution_with_instrumentation, |
||
62 | ex_db_models.ActionExecutionDB: handle_action_execution_with_instrumentation |
||
63 | } |
||
64 | |||
65 | def get_queue_consumer(self, connection, queues): |
||
0 ignored issues
–
show
queues is re-defining a name which is already available in the outer-scope (previously defined on line 32 ).
It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior: param = 5
class Foo:
def __init__(self, param): # "param" would be flagged here
self.param = param
Loading history...
|
|||
66 | # We want to use a special ActionsQueueConsumer which uses 2 dispatcher pools |
||
67 | return consumers.VariableMessageQueueConsumer( |
||
68 | connection=connection, |
||
69 | queues=queues, |
||
70 | handler=self |
||
71 | ) |
||
72 | |||
73 | def process(self, message): |
||
74 | handler_function = self.message_types.get(type(message)) |
||
75 | handler_function(message) |
||
76 | |||
77 | def handle_workflow_execution(self, wf_ex_db): |
||
78 | iteration = 0 |
||
79 | wf_ac_ex_id = wf_ex_db.action_execution |
||
80 | LOG.info('[%s] Processing request for workflow execution.', wf_ac_ex_id) |
||
81 | |||
82 | # Refresh record from the database in case the request is in the queue for too long. |
||
83 | conductor, wf_ex_db = wf_svc.refresh_conductor(str(wf_ex_db.id)) |
||
84 | |||
85 | # Continue if workflow is still active and set workflow to running state. |
||
86 | if conductor.get_workflow_state() not in states.COMPLETED_STATES: |
||
87 | LOG.info('[%s] Requesting conductor to start running workflow execution.', wf_ac_ex_id) |
||
88 | conductor.request_workflow_state(states.RUNNING) |
||
89 | |||
90 | # Identify the next set of tasks to execute. |
||
91 | msg = '[%s] Identifying next set (%s) of tasks for workflow execution in state "%s".' |
||
92 | LOG.info(msg, wf_ac_ex_id, str(iteration), conductor.get_workflow_state()) |
||
93 | LOG.debug('[%s] %s', wf_ac_ex_id, conductor.serialize()) |
||
94 | next_tasks = conductor.get_next_tasks() |
||
95 | |||
96 | # If there is no new tasks, update execution records to handle possible completion. |
||
97 | if not next_tasks: |
||
98 | # Update workflow execution and related liveaction and action execution. |
||
99 | LOG.info('[%s] No tasks identified to execute next.', wf_ac_ex_id) |
||
100 | wf_svc.update_execution_records(wf_ex_db, conductor) |
||
101 | |||
102 | # If workflow execution is no longer active, then stop processing here. |
||
103 | if wf_ex_db.status in states.COMPLETED_STATES: |
||
104 | msg = '[%s] Workflow execution is in completed state "%s".' |
||
105 | LOG.info(msg, wf_ac_ex_id, wf_ex_db.status) |
||
106 | return |
||
107 | |||
108 | # Iterate while there are next tasks identified for processing. In the case for |
||
109 | # task with no action execution defined, the task execution will complete |
||
110 | # immediately with a new set of tasks available. |
||
111 | while next_tasks: |
||
112 | msg = '[%s] Identified the following set of tasks to execute next: %s' |
||
113 | LOG.info(msg, wf_ac_ex_id, ', '.join([task['id'] for task in next_tasks])) |
||
114 | |||
115 | # Mark the tasks as running in the task flow before actual task execution. |
||
116 | for task in next_tasks: |
||
117 | msg = '[%s] Mark task "%s" in conductor as running.' |
||
118 | LOG.info(msg, wf_ac_ex_id, task['id']) |
||
119 | ac_ex_event = events.ActionExecutionEvent(states.RUNNING) |
||
120 | conductor.update_task_flow(task['id'], ac_ex_event) |
||
121 | |||
122 | # Update workflow execution and related liveaction and action execution. |
||
123 | wf_svc.update_execution_records(wf_ex_db, conductor) |
||
124 | |||
125 | # If workflow execution is no longer active, then stop processing here. |
||
126 | if wf_ex_db.status in states.COMPLETED_STATES: |
||
127 | msg = '[%s] Workflow execution is in completed state "%s".' |
||
128 | LOG.info(msg, wf_ac_ex_id, wf_ex_db.status) |
||
129 | break |
||
130 | |||
131 | # Request task execution for the tasks. |
||
132 | for task in next_tasks: |
||
133 | try: |
||
134 | LOG.info('[%s] Requesting execution for task "%s".', wf_ac_ex_id, task['id']) |
||
135 | task_id, task_spec, task_ctx = task['id'], task['spec'], task['ctx'] |
||
136 | st2_ctx = {'execution_id': wf_ex_db.action_execution} |
||
137 | wf_svc.request_task_execution(wf_ex_db, task_id, task_spec, task_ctx, st2_ctx) |
||
138 | except Exception as e: |
||
139 | LOG.exception('[%s] Failed task execution for "%s".', wf_ac_ex_id, task['id']) |
||
140 | wf_svc.fail_workflow_execution(str(wf_ex_db.id), e, task_id=task['id']) |
||
141 | return |
||
142 | |||
143 | # Identify the next set of tasks to execute. |
||
144 | iteration += 1 |
||
145 | conductor, wf_ex_db = wf_svc.refresh_conductor(str(wf_ex_db.id)) |
||
146 | msg = '[%s] Identifying next set (%s) of tasks for workflow execution in state "%s".' |
||
147 | LOG.info(msg, wf_ac_ex_id, str(iteration), conductor.get_workflow_state()) |
||
148 | LOG.debug('[%s] %s', wf_ac_ex_id, conductor.serialize()) |
||
149 | next_tasks = conductor.get_next_tasks() |
||
150 | |||
151 | if not next_tasks: |
||
152 | LOG.info('[%s] No tasks identified to execute next.', wf_ac_ex_id) |
||
153 | |||
154 | def handle_action_execution(self, ac_ex_db): |
||
155 | # Exit if action execution is not executed under an orquesta workflow. |
||
156 | if 'orquesta' not in ac_ex_db.context: |
||
157 | return |
||
158 | |||
159 | # Get related record identifiers. |
||
160 | wf_ex_id = ac_ex_db.context['orquesta']['workflow_execution_id'] |
||
161 | task_ex_id = ac_ex_db.context['orquesta']['task_execution_id'] |
||
162 | |||
163 | # Get execution records for logging purposes. |
||
164 | wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_id) |
||
165 | task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id) |
||
166 | |||
167 | wf_ac_ex_id = wf_ex_db.action_execution |
||
168 | msg = '[%s] Action execution "%s" for task "%s" is updated and in "%s" state.' |
||
169 | LOG.info(msg, wf_ac_ex_id, str(ac_ex_db.id), task_ex_db.task_id, ac_ex_db.status) |
||
170 | |||
171 | # Skip if task execution is already in completed state. |
||
172 | if task_ex_db.status in states.COMPLETED_STATES: |
||
173 | LOG.info( |
||
174 | '[%s] Action execution "%s" for task "%s" is not processed because ' |
||
175 | 'task execution "%s" is already in completed state "%s".', |
||
176 | wf_ac_ex_id, |
||
177 | str(ac_ex_db.id), |
||
178 | task_ex_db.task_id, |
||
179 | str(task_ex_db.id), |
||
180 | task_ex_db.status |
||
181 | ) |
||
182 | |||
183 | return |
||
184 | |||
185 | # Process pending request on the action execution. |
||
186 | if ac_ex_db.status == ac_const.LIVEACTION_STATUS_PENDING: |
||
187 | wf_svc.handle_action_execution_pending(ac_ex_db) |
||
188 | return |
||
189 | |||
190 | # Process pause request on the action execution. |
||
191 | if ac_ex_db.status == ac_const.LIVEACTION_STATUS_PAUSED: |
||
192 | wf_svc.handle_action_execution_pause(ac_ex_db) |
||
193 | return |
||
194 | |||
195 | # Exit if action execution has not completed yet. |
||
196 | if ac_ex_db.status not in ac_const.LIVEACTION_COMPLETED_STATES: |
||
197 | return |
||
198 | |||
199 | # Apply post run policies. |
||
200 | lv_ac_db = lv_db_access.LiveAction.get_by_id(ac_ex_db.liveaction['id']) |
||
201 | pc_svc.apply_post_run_policies(lv_ac_db) |
||
202 | |||
203 | # Process completion of the action execution. |
||
204 | wf_svc.handle_action_execution_completion(ac_ex_db) |
||
205 | |||
206 | |||
207 | def get_engine(): |
||
208 | with kombu.Connection(txpt_utils.get_messaging_urls()) as conn: |
||
209 | return WorkflowExecutionHandler(conn, WORKFLOW_EXECUTION_QUEUES) |
||
210 |
It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior: