Test Setup Failed
Pull Request — master (#4154)
by W
03:36
created

WorkflowDispatcher.process()   C

Complexity

Conditions 7

Size

Total Lines 53

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
c 1
b 0
f 0
dl 0
loc 53
rs 6.4447

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import copy
19
import kombu
20
21
from orchestra import states
22
23
from st2common import log as logging
24
from st2common.models.db import workflow as wf_db_models
25
from st2common.persistence import execution as ex_db_access
26
from st2common.persistence import workflow as wf_db_access
27
from st2common.services import executions as ex_svc
28
from st2common.services import workflows as wf_svc
29
from st2common.transport import consumers
30
from st2common.transport import queues
31
from st2common.transport import utils as txpt_utils
32
from st2common.util import action_db as ac_db_util
33
from st2common.util import date as date_utils
34
35
36
LOG = logging.getLogger(__name__)
37
38
39
WORKFLOW_EXECUTION_QUEUES = [
40
    queues.WORKFLOW_EXECUTION_WORK_QUEUE,
41
    queues.WORKFLOW_EXECUTION_RESUME_QUEUE
42
]
43
44
45
class WorkflowDispatcher(consumers.MessageHandler):
46
    message_type = wf_db_models.WorkflowExecutionDB
47
48
    def __init__(self, connection, queues):
0 ignored issues
show
Comprehensibility Bug introduced by
queues is re-defining a name which is already available in the outer-scope (previously defined on line 30).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
49
        super(WorkflowDispatcher, self).__init__(connection, queues)
50
51
    def get_queue_consumer(self, connection, queues):
0 ignored issues
show
Comprehensibility Bug introduced by
queues is re-defining a name which is already available in the outer-scope (previously defined on line 30).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
52
        # We want to use a special ActionsQueueConsumer which uses 2 dispatcher pools
53
        return consumers.ActionsQueueConsumer(connection=connection, queues=queues, handler=self)
54
55
    def process(self, wf_ex_db):
56
        # Refresh record from the database in case the request is in the queue for too long.
57
        conductor, wf_ex_db = wf_svc.refresh_conductor(str(wf_ex_db.id))
58
59
        # Continue if workflow is still active.
60
        if conductor.get_workflow_state() not in states.COMPLETED_STATES:
61
            # Set workflow to running state.
62
            conductor.set_workflow_state(states.RUNNING)
63
64
            # Identify the next set of tasks to execute.
65
            next_tasks = conductor.get_next_tasks()
66
67
            # Mark the tasks as running in the task flow before actual task execution.
68
            for task in next_tasks:
69
                conductor.update_task_flow(task['id'], states.RUNNING)
70
71
        # Update timestamp and output if workflow is no longer running.
72
        if conductor.get_workflow_state() in states.COMPLETED_STATES:
73
            wf_ex_db.end_timestamp = date_utils.get_datetime_utc_now()
74
            wf_ex_db.output = conductor.get_workflow_output()
75
76
        # Write the updated workflow state and task flow to the database.
77
        wf_ex_db.status = conductor.get_workflow_state()
78
        wf_ex_db.errors = copy.deepcopy(conductor.errors)
79
        wf_ex_db.flow = conductor.flow.serialize()
80
        wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
81
82
        # If workflow execution is no longer active, then update database and stop.
83
        if wf_ex_db.status in states.COMPLETED_STATES:
84
            # Update the corresponding liveaction and action execution for the workflow.
85
            wf_ac_ex_db = ex_db_access.ActionExecution.get_by_id(wf_ex_db.action_execution)
86
            wf_lv_ac_db = ac_db_util.get_liveaction_by_id(wf_ac_ex_db.liveaction['id'])
87
88
            result = wf_ex_db.output
89
90
            if wf_ex_db.status in states.ABENDED_STATES:
91
                result['errors'] = wf_ex_db.errors
92
93
            wf_lv_ac_db = ac_db_util.update_liveaction_status(
94
                status=wf_ex_db.status,
95
                result=result,
96
                end_timestamp=wf_ex_db.end_timestamp,
97
                liveaction_db=wf_lv_ac_db)
98
99
            ex_svc.update_execution(wf_lv_ac_db)
100
101
            # stop processing
102
            return
103
104
        # Request task execution for the tasks.
105
        for task in next_tasks:
106
            st2_ctx = {'execution_id': wf_ex_db.action_execution}
107
            wf_svc.request_task_execution(wf_ex_db, task['id'], task['spec'], task['ctx'], st2_ctx)
108
109
110
def get_engine():
111
    with kombu.Connection(txpt_utils.get_messaging_urls()) as conn:
112
        return WorkflowDispatcher(conn, WORKFLOW_EXECUTION_QUEUES)
113