Test Setup Failed
Pull Request — master (#4154)
by W
04:10
created

WorkflowDispatcher.process()   F

Complexity

Conditions 9

Size

Total Lines 49

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 9
c 1
b 0
f 0
dl 0
loc 49
rs 3.75
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import kombu
19
20
from orchestra import states
21
22
from st2common import log as logging
23
from st2common.models.db import workflow as wf_db_models
24
from st2common.services import workflows as wf_svc
25
from st2common.transport import consumers
26
from st2common.transport import queues
27
from st2common.transport import utils as txpt_utils
28
29
30
LOG = logging.getLogger(__name__)
31
32
33
WORKFLOW_EXECUTION_QUEUES = [
34
    queues.WORKFLOW_EXECUTION_WORK_QUEUE,
35
    queues.WORKFLOW_EXECUTION_RESUME_QUEUE
36
]
37
38
39
class WorkflowDispatcher(consumers.MessageHandler):
40
    message_type = wf_db_models.WorkflowExecutionDB
41
42
    def __init__(self, connection, queues):
0 ignored issues
show
Comprehensibility Bug introduced by
queues is re-defining a name which is already available in the outer-scope (previously defined on line 26).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
43
        super(WorkflowDispatcher, self).__init__(connection, queues)
44
45
    def get_queue_consumer(self, connection, queues):
0 ignored issues
show
Comprehensibility Bug introduced by
queues is re-defining a name which is already available in the outer-scope (previously defined on line 26).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
46
        # We want to use a special ActionsQueueConsumer which uses 2 dispatcher pools
47
        return consumers.QueueConsumer(connection=connection, queues=queues, handler=self)
48
49
    def process(self, wf_ex_db):
50
        # Refresh record from the database in case the request is in the queue for too long.
51
        conductor, wf_ex_db = wf_svc.refresh_conductor(str(wf_ex_db.id))
52
53
        # Continue if workflow is still active.
54
        if conductor.get_workflow_state() not in states.COMPLETED_STATES:
55
            # Set workflow to running state.
56
            conductor.set_workflow_state(states.RUNNING)
57
58
        # Identify the next set of tasks to execute.
59
        next_tasks = conductor.get_next_tasks()
60
61
        # If there is no new tasks, update execution records to handle possible completion.
62
        if not next_tasks:
63
            # Update workflow execution and related liveaction and action execution.
64
            wf_svc.update_execution_records(wf_ex_db, conductor)
65
66
        # If workflow execution is no longer active, then stop processing here.
67
        if wf_ex_db.status in states.COMPLETED_STATES:
68
            return
69
70
        # Iterate while there are next tasks identified for processing. In the case for
71
        # task with no action execution defined, the task execution will complete
72
        # immediately with a new set of tasks available.
73
        while next_tasks:
74
            # Mark the tasks as running in the task flow before actual task execution.
75
            for task in next_tasks:
76
                conductor.update_task_flow(task['id'], states.RUNNING)
77
78
            # Update workflow execution and related liveaction and action execution.
79
            wf_svc.update_execution_records(wf_ex_db, conductor)
80
81
            # If workflow execution is no longer active, then stop processing here.
82
            if wf_ex_db.status in states.COMPLETED_STATES:
83
                break
84
85
            # Request task execution for the tasks.
86
            for task in next_tasks:
87
                try:
88
                    task_id, task_spec, task_ctx = task['id'], task['spec'], task['ctx']
89
                    st2_ctx = {'execution_id': wf_ex_db.action_execution}
90
                    wf_svc.request_task_execution(wf_ex_db, task_id, task_spec, task_ctx, st2_ctx)
91
                except Exception as e:
92
                    wf_svc.fail_workflow_execution(str(wf_ex_db.id), e, task_id=task['id'])
93
                    return
94
95
            # Identify the next set of tasks to execute.
96
            conductor, wf_ex_db = wf_svc.refresh_conductor(str(wf_ex_db.id))
97
            next_tasks = conductor.get_next_tasks()
98
99
100
def get_engine():
101
    with kombu.Connection(txpt_utils.get_messaging_urls()) as conn:
102
        return WorkflowDispatcher(conn, WORKFLOW_EXECUTION_QUEUES)
103