Test Setup Failed
Pull Request — master (#4154)
by W
03:37
created

OrchestraRunner.pause()   B

Complexity

Conditions 5

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
c 0
b 0
f 0
dl 0
loc 24
rs 8.1671
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import copy
19
import uuid
20
21
from orchestra import exceptions as wf_exc
22
from orchestra import states as wf_states
23
24
from st2common.constants import action as ac_const
25
from st2common import log as logging
26
from st2common.persistence import execution as ex_db_access
27
from st2common.persistence import liveaction as lv_db_access
28
from st2common.runners import base as runners
29
from st2common.services import action as ac_svc
30
from st2common.services import workflows as wf_svc
31
32
33
__all__ = [
34
    'OrchestraRunner',
35
    'get_runner',
36
    'get_metadata'
37
]
38
39
40
LOG = logging.getLogger(__name__)
41
42
43
class OrchestraRunner(runners.AsyncActionRunner):
44
45
    @staticmethod
46
    def get_workflow_definition(entry_point):
47
        with open(entry_point, 'r') as def_file:
48
            return def_file.read()
49
50
    def _construct_context(self, wf_ex):
51
        ctx = copy.deepcopy(self.context)
52
        ctx['workflow_execution'] = str(wf_ex.id)
53
54
        return ctx
55
56
    def run(self, action_parameters):
57
        # Read workflow definition from file.
58
        wf_def = self.get_workflow_definition(self.entry_point)
59
60
        try:
61
            # Request workflow execution.
62
            wf_ex_db = wf_svc.request(wf_def, self.execution)
63
        except wf_exc.WorkflowInspectionError as e:
64
            status = ac_const.LIVEACTION_STATUS_FAILED
65
            result = {'errors': e.args[1], 'output': None}
66
            return (status, result, self.context)
67
        except Exception as e:
68
            status = ac_const.LIVEACTION_STATUS_FAILED
69
            result = {'errors': str(e), 'output': None}
70
            return (status, result, self.context)
71
72
        # Set return values.
73
        status = ac_const.LIVEACTION_STATUS_RUNNING
74
        partial_results = {}
75
        ctx = self._construct_context(wf_ex_db)
76
77
        return (status, partial_results, ctx)
78
79
    def pause(self):
80
        # Pause the target workflow.
81
        wf_ex_db = wf_svc.request_pause(self.execution)
82
83
        # Request pause of tasks that are workflows and still running.
84
        for child_ex_id in self.execution.children:
85
            child_ex = ex_db_access.ActionExecution.get(id=child_ex_id)
86
            if (child_ex.runner['name'] in ac_const.WORKFLOW_RUNNER_TYPES and
87
                    child_ex.status == ac_const.LIVEACTION_STATUS_RUNNING):
88
                ac_svc.request_pause(
89
                    lv_db_access.LiveAction.get(id=child_ex.liveaction['id']),
90
                    self.context.get('user', None)
91
                )
92
93
        status = (
94
            ac_const.LIVEACTION_STATUS_PAUSING
95
            if wf_ex_db.status == wf_states.PAUSING or ac_svc.is_children_active(self.liveaction.id)
96
            else ac_const.LIVEACTION_STATUS_PAUSED
97
        )
98
99
        return (
100
            status,
101
            self.liveaction.result,
102
            self.liveaction.context
103
        )
104
105
    def resume(self):
106
        # Resume the target workflow.
107
        wf_svc.request_resume(self.execution)
108
109
        # Request resume of tasks that are workflows and still running.
110
        for child_ex_id in self.execution.children:
111
            child_ex = ex_db_access.ActionExecution.get(id=child_ex_id)
112
            if (child_ex.runner['name'] in ac_const.WORKFLOW_RUNNER_TYPES and
113
                    child_ex.status == ac_const.LIVEACTION_STATUS_PAUSED):
114
                ac_svc.request_resume(
115
                    lv_db_access.LiveAction.get(id=child_ex.liveaction['id']),
116
                    self.context.get('user', None)
117
                )
118
119
        return (
120
            ac_const.LIVEACTION_STATUS_RUNNING,
121
            self.liveaction.result,
122
            self.liveaction.context
123
        )
124
125
    def cancel(self):
126
        # Cancel the target workflow.
127
        wf_svc.request_cancellation(self.execution)
128
129
        # Request cancellation of tasks that are workflows and still running.
130
        for child_ex_id in self.execution.children:
131
            child_ex = ex_db_access.ActionExecution.get(id=child_ex_id)
132
            if (child_ex.runner['name'] in ac_const.WORKFLOW_RUNNER_TYPES and
133
                    child_ex.status in ac_const.LIVEACTION_CANCELABLE_STATES):
134
                ac_svc.request_cancellation(
135
                    lv_db_access.LiveAction.get(id=child_ex.liveaction['id']),
136
                    self.context.get('user', None)
137
                )
138
139
        status = (
140
            ac_const.LIVEACTION_STATUS_CANCELING
141
            if ac_svc.is_children_active(self.liveaction.id)
142
            else ac_const.LIVEACTION_STATUS_CANCELED
143
        )
144
145
        return (
146
            status,
147
            self.liveaction.result,
148
            self.liveaction.context
149
        )
150
151
152
def get_runner():
153
    return OrchestraRunner(str(uuid.uuid4()))
154
155
156
def get_metadata():
157
    return runners.get_metadata('orchestra_runner')
158