Test Failed
Pull Request — master (#4195)
by W
04:08
created

OrchestraRunner._construct_st2_context()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 12
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import copy
19
import uuid
20
21
from orchestra import exceptions as wf_exc
22
from orchestra import states as wf_states
23
24
from st2common.constants import action as ac_const
25
from st2common import log as logging
26
from st2common.persistence import execution as ex_db_access
27
from st2common.persistence import liveaction as lv_db_access
28
from st2common.runners import base as runners
29
from st2common.services import action as ac_svc
30
from st2common.services import workflows as wf_svc
31
from st2common.util import api as api_util
32
33
34
__all__ = [
35
    'OrchestraRunner',
36
    'get_runner',
37
    'get_metadata'
38
]
39
40
41
LOG = logging.getLogger(__name__)
42
43
44
class OrchestraRunner(runners.AsyncActionRunner):
45
46
    @staticmethod
47
    def get_workflow_definition(entry_point):
48
        with open(entry_point, 'r') as def_file:
49
            return def_file.read()
50
51
    def _construct_context(self, wf_ex):
52
        ctx = copy.deepcopy(self.context)
53
        ctx['workflow_execution'] = str(wf_ex.id)
54
55
        return ctx
56
57
    def _construct_st2_context(self):
58
        st2_ctx = {
59
            'st2': {
60
                'api_url': api_util.get_full_public_api_url(),
61
                'action_execution_id': str(self.execution.id)
62
            }
63
        }
64
65
        if self.execution.context:
66
            st2_ctx['parent'] = self.execution.context
67
68
        return st2_ctx
69
70
    def run(self, action_parameters):
71
        # Read workflow definition from file.
72
        wf_def = self.get_workflow_definition(self.entry_point)
73
74
        try:
75
            # Request workflow execution.
76
            st2_ctx = self._construct_st2_context()
77
            wf_ex_db = wf_svc.request(wf_def, self.execution, st2_ctx)
78
        except wf_exc.WorkflowInspectionError as e:
79
            status = ac_const.LIVEACTION_STATUS_FAILED
80
            result = {'errors': e.args[1], 'output': None}
81
            return (status, result, self.context)
82
        except Exception as e:
83
            status = ac_const.LIVEACTION_STATUS_FAILED
84
            result = {'errors': str(e), 'output': None}
85
            return (status, result, self.context)
86
87
        # Set return values.
88
        status = ac_const.LIVEACTION_STATUS_RUNNING
89
        partial_results = {}
90
        ctx = self._construct_context(wf_ex_db)
91
92
        return (status, partial_results, ctx)
93
94
    def pause(self):
95
        # Pause the target workflow.
96
        wf_ex_db = wf_svc.request_pause(self.execution)
97
98
        # Request pause of tasks that are workflows and still running.
99
        for child_ex_id in self.execution.children:
100
            child_ex = ex_db_access.ActionExecution.get(id=child_ex_id)
101
            if (child_ex.runner['name'] in ac_const.WORKFLOW_RUNNER_TYPES and
102
                    child_ex.status == ac_const.LIVEACTION_STATUS_RUNNING):
103
                ac_svc.request_pause(
104
                    lv_db_access.LiveAction.get(id=child_ex.liveaction['id']),
105
                    self.context.get('user', None)
106
                )
107
108
        status = (
109
            ac_const.LIVEACTION_STATUS_PAUSING
110
            if wf_ex_db.status == wf_states.PAUSING or ac_svc.is_children_active(self.liveaction.id)
111
            else ac_const.LIVEACTION_STATUS_PAUSED
112
        )
113
114
        return (
115
            status,
116
            self.liveaction.result,
117
            self.liveaction.context
118
        )
119
120
    def resume(self):
121
        # Resume the target workflow.
122
        wf_svc.request_resume(self.execution)
123
124
        # Request resume of tasks that are workflows and still running.
125
        for child_ex_id in self.execution.children:
126
            child_ex = ex_db_access.ActionExecution.get(id=child_ex_id)
127
            if (child_ex.runner['name'] in ac_const.WORKFLOW_RUNNER_TYPES and
128
                    child_ex.status == ac_const.LIVEACTION_STATUS_PAUSED):
129
                ac_svc.request_resume(
130
                    lv_db_access.LiveAction.get(id=child_ex.liveaction['id']),
131
                    self.context.get('user', None)
132
                )
133
134
        return (
135
            ac_const.LIVEACTION_STATUS_RUNNING,
136
            self.liveaction.result,
137
            self.liveaction.context
138
        )
139
140
    def cancel(self):
141
        # Cancel the target workflow.
142
        wf_svc.request_cancellation(self.execution)
143
144
        # Request cancellation of tasks that are workflows and still running.
145
        for child_ex_id in self.execution.children:
146
            child_ex = ex_db_access.ActionExecution.get(id=child_ex_id)
147
            if (child_ex.runner['name'] in ac_const.WORKFLOW_RUNNER_TYPES and
148
                    child_ex.status in ac_const.LIVEACTION_CANCELABLE_STATES):
149
                ac_svc.request_cancellation(
150
                    lv_db_access.LiveAction.get(id=child_ex.liveaction['id']),
151
                    self.context.get('user', None)
152
                )
153
154
        status = (
155
            ac_const.LIVEACTION_STATUS_CANCELING
156
            if ac_svc.is_children_active(self.liveaction.id)
157
            else ac_const.LIVEACTION_STATUS_CANCELED
158
        )
159
160
        return (
161
            status,
162
            self.liveaction.result,
163
            self.liveaction.context
164
        )
165
166
167
def get_runner():
168
    return OrchestraRunner(str(uuid.uuid4()))
169
170
171
def get_metadata():
172
    return runners.get_metadata('orchestra_runner')
173