Test Failed
Push — master ( 21460f...e380d0 )
by Tomaz
01:48
created

st2actions/st2actions/workflows/workflows.py (2 issues)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import kombu
19
20
from orquesta import events
21
from orquesta import states
22
23
from st2common.constants import action as ac_const
24
from st2common import log as logging
25
from st2common.models.db import execution as ex_db_models
26
from st2common.models.db import workflow as wf_db_models
27
from st2common.persistence import liveaction as lv_db_access
28
from st2common.persistence import workflow as wf_db_access
29
from st2common.services import policies as pc_svc
30
from st2common.services import workflows as wf_svc
31
from st2common.transport import consumers
32
from st2common.transport import queues
33
from st2common.transport import utils as txpt_utils
34
from st2common.metrics.base import CounterWithTimer
35
36
37
LOG = logging.getLogger(__name__)
38
39
40
WORKFLOW_EXECUTION_QUEUES = [
41
    queues.WORKFLOW_EXECUTION_WORK_QUEUE,
42
    queues.WORKFLOW_EXECUTION_RESUME_QUEUE,
43
    queues.WORKFLOW_ACTION_EXECUTION_UPDATE_QUEUE
44
]
45
46
47
class WorkflowExecutionHandler(consumers.VariableMessageHandler):
48
49
    def __init__(self, connection, queues):
0 ignored issues
show
Comprehensibility Bug introduced by
queues is re-defining a name which is already available in the outer-scope (previously defined on line 32).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
50
        super(WorkflowExecutionHandler, self).__init__(connection, queues)
51
52
        def handle_workflow_execution_with_instrumentation(wf_ex_db):
53
            with CounterWithTimer(key='orquesta.workflow.executions'):
54
                return self.handle_workflow_execution(wf_ex_db=wf_ex_db)
55
56
        def handle_action_execution_with_instrumentation(ac_ex_db):
57
            with CounterWithTimer(key='orquesta.action.executions'):
58
                return self.handle_action_execution(ac_ex_db=ac_ex_db)
59
60
        self.message_types = {
61
            wf_db_models.WorkflowExecutionDB: handle_workflow_execution_with_instrumentation,
62
            ex_db_models.ActionExecutionDB: handle_action_execution_with_instrumentation
63
        }
64
65
    def get_queue_consumer(self, connection, queues):
0 ignored issues
show
Comprehensibility Bug introduced by
queues is re-defining a name which is already available in the outer-scope (previously defined on line 32).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
66
        # We want to use a special ActionsQueueConsumer which uses 2 dispatcher pools
67
        return consumers.VariableMessageQueueConsumer(
68
            connection=connection,
69
            queues=queues,
70
            handler=self
71
        )
72
73
    def process(self, message):
74
        handler_function = self.message_types.get(type(message))
75
        handler_function(message)
76
77
    def handle_workflow_execution(self, wf_ex_db):
78
        iteration = 0
79
        wf_ac_ex_id = wf_ex_db.action_execution
80
        LOG.info('[%s] Processing request for workflow execution.', wf_ac_ex_id)
81
82
        # Refresh record from the database in case the request is in the queue for too long.
83
        conductor, wf_ex_db = wf_svc.refresh_conductor(str(wf_ex_db.id))
84
85
        # Continue if workflow is still active and set workflow to running state.
86
        if conductor.get_workflow_state() not in states.COMPLETED_STATES:
87
            LOG.info('[%s] Requesting conductor to start running workflow execution.', wf_ac_ex_id)
88
            conductor.request_workflow_state(states.RUNNING)
89
90
        # Identify the next set of tasks to execute.
91
        msg = '[%s] Identifying next set (%s) of tasks for workflow execution in state "%s".'
92
        LOG.info(msg, wf_ac_ex_id, str(iteration), conductor.get_workflow_state())
93
        LOG.debug('[%s] %s', wf_ac_ex_id, conductor.serialize())
94
        next_tasks = conductor.get_next_tasks()
95
96
        # If there is no new tasks, update execution records to handle possible completion.
97
        if not next_tasks:
98
            # Update workflow execution and related liveaction and action execution.
99
            LOG.info('[%s] No tasks identified to execute next.', wf_ac_ex_id)
100
            wf_svc.update_execution_records(wf_ex_db, conductor)
101
102
        # If workflow execution is no longer active, then stop processing here.
103
        if wf_ex_db.status in states.COMPLETED_STATES:
104
            msg = '[%s] Workflow execution is in completed state "%s".'
105
            LOG.info(msg, wf_ac_ex_id, wf_ex_db.status)
106
            return
107
108
        # Iterate while there are next tasks identified for processing. In the case for
109
        # task with no action execution defined, the task execution will complete
110
        # immediately with a new set of tasks available.
111
        while next_tasks:
112
            msg = '[%s] Identified the following set of tasks to execute next: %s'
113
            LOG.info(msg, wf_ac_ex_id, ', '.join([task['id'] for task in next_tasks]))
114
115
            # Mark the tasks as running in the task flow before actual task execution.
116
            for task in next_tasks:
117
                msg = '[%s] Mark task "%s" in conductor as running.'
118
                LOG.info(msg, wf_ac_ex_id, task['id'])
119
                ac_ex_event = events.ActionExecutionEvent(states.RUNNING)
120
                conductor.update_task_flow(task['id'], ac_ex_event)
121
122
            # Update workflow execution and related liveaction and action execution.
123
            wf_svc.update_execution_records(wf_ex_db, conductor)
124
125
            # If workflow execution is no longer active, then stop processing here.
126
            if wf_ex_db.status in states.COMPLETED_STATES:
127
                msg = '[%s] Workflow execution is in completed state "%s".'
128
                LOG.info(msg, wf_ac_ex_id, wf_ex_db.status)
129
                break
130
131
            # Request task execution for the tasks.
132
            for task in next_tasks:
133
                try:
134
                    LOG.info('[%s] Requesting execution for task "%s".', wf_ac_ex_id, task['id'])
135
                    task_id, task_spec, task_ctx = task['id'], task['spec'], task['ctx']
136
                    st2_ctx = {'execution_id': wf_ex_db.action_execution}
137
                    wf_svc.request_task_execution(wf_ex_db, task_id, task_spec, task_ctx, st2_ctx)
138
                except Exception as e:
139
                    LOG.exception('[%s] Failed task execution for "%s".', wf_ac_ex_id, task['id'])
140
                    wf_svc.fail_workflow_execution(str(wf_ex_db.id), e, task_id=task['id'])
141
                    return
142
143
            # Identify the next set of tasks to execute.
144
            iteration += 1
145
            conductor, wf_ex_db = wf_svc.refresh_conductor(str(wf_ex_db.id))
146
            msg = '[%s] Identifying next set (%s) of tasks for workflow execution in state "%s".'
147
            LOG.info(msg, wf_ac_ex_id, str(iteration), conductor.get_workflow_state())
148
            LOG.debug('[%s] %s', wf_ac_ex_id, conductor.serialize())
149
            next_tasks = conductor.get_next_tasks()
150
151
            if not next_tasks:
152
                LOG.info('[%s] No tasks identified to execute next.', wf_ac_ex_id)
153
154
    def handle_action_execution(self, ac_ex_db):
155
        # Exit if action execution is not  executed under an orquesta workflow.
156
        if 'orquesta' not in ac_ex_db.context:
157
            return
158
159
        # Get related record identifiers.
160
        wf_ex_id = ac_ex_db.context['orquesta']['workflow_execution_id']
161
        task_ex_id = ac_ex_db.context['orquesta']['task_execution_id']
162
163
        # Get execution records for logging purposes.
164
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_id)
165
        task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
166
167
        wf_ac_ex_id = wf_ex_db.action_execution
168
        msg = '[%s] Action execution "%s" for task "%s" is updated and in "%s" state.'
169
        LOG.info(msg, wf_ac_ex_id, str(ac_ex_db.id), task_ex_db.task_id, ac_ex_db.status)
170
171
        # Skip if task execution is already in completed state.
172
        if task_ex_db.status in states.COMPLETED_STATES:
173
            LOG.info(
174
                '[%s] Action execution "%s" for task "%s" is not processed because '
175
                'task execution "%s" is already in completed state "%s".',
176
                wf_ac_ex_id,
177
                str(ac_ex_db.id),
178
                task_ex_db.task_id,
179
                str(task_ex_db.id),
180
                task_ex_db.status
181
            )
182
183
            return
184
185
        # Process pending request on the action execution.
186
        if ac_ex_db.status == ac_const.LIVEACTION_STATUS_PENDING:
187
            wf_svc.handle_action_execution_pending(ac_ex_db)
188
            return
189
190
        # Process pause request on the action execution.
191
        if ac_ex_db.status == ac_const.LIVEACTION_STATUS_PAUSED:
192
            wf_svc.handle_action_execution_pause(ac_ex_db)
193
            return
194
195
        # Exit if action execution has not completed yet.
196
        if ac_ex_db.status not in ac_const.LIVEACTION_COMPLETED_STATES:
197
            return
198
199
        # Apply post run policies.
200
        lv_ac_db = lv_db_access.LiveAction.get_by_id(ac_ex_db.liveaction['id'])
201
        pc_svc.apply_post_run_policies(lv_ac_db)
202
203
        # Process completion of the action execution.
204
        wf_svc.handle_action_execution_completion(ac_ex_db)
205
206
207
def get_engine():
208
    with kombu.Connection(txpt_utils.get_messaging_urls()) as conn:
209
        return WorkflowExecutionHandler(conn, WORKFLOW_EXECUTION_QUEUES)
210