GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — plexxi-v2.3.2 ( 5d46fe )
by
unknown
06:59
created

WiringTest   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 329
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 329
rs 8.3396
wmc 44

21 Methods

Rating   Name   Duplication   Size   Complexity  
A setUp() 0 6 1
A test_basic_workbook() 0 5 1
A test_complex_workbook_subflow_actions() 0 7 1
A test_with_items() 0 6 1
A tearDown() 0 6 4
A test_complex_workbook_with_jinja() 0 6 1
A test_concurrent_load() 0 11 3
A test_execution_failure() 0 4 1
A test_complex_workbook_with_yaql() 0 6 1
A test_basic_workflow() 0 5 1
B test_cancellation_cascade_subworkflow_action() 0 29 3
A test_cancellation() 0 12 1
A test_basic_rerun_task() 0 20 3
A test_basic_rerun() 0 20 3
A test_task_cancellation() 0 16 3
B test_rerun_subflow_and_resume_with_items_task() 0 33 3
B test_rerun_subflow_and_reset_with_items_task() 0 28 3
B test_basic_rerun_and_resume_with_items_task() 0 33 3
B test_basic_rerun_and_reset_with_items_task() 0 28 3
A test_rerun_subflow_task() 0 21 3
A test_invoke_from_action_chain() 0 3 1

How to fix   Complexity   

Complex Class

Complex classes like WiringTest often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import shutil
18
import tempfile
19
20
import eventlet
21
22
from integration.mistral import base
23
24
25
class WiringTest(base.TestWorkflowExecution):
26
27
    temp_dir_path = None
28
29
    def setUp(self):
30
        super(WiringTest, self).setUp()
31
32
        # Create temporary directory used by the tests
33
        _, self.temp_dir_path = tempfile.mkstemp()
34
        os.chmod(self.temp_dir_path, 0755)   # nosec
35
36
    def tearDown(self):
37
        if self.temp_dir_path and os.path.exists(self.temp_dir_path):
38
            if os.path.isdir(self.temp_dir_path):
39
                shutil.rmtree(self.temp_dir_path)
40
            else:
41
                os.remove(self.temp_dir_path)
42
43
    def test_basic_workflow(self):
44
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'date'})
45
        execution = self._wait_for_completion(execution)
46
        self._assert_success(execution, num_tasks=1)
47
        self.assertIn('stdout', execution.result)
48
49
    def test_basic_workbook(self):
50
        execution = self._execute_workflow('examples.mistral-workbook-basic', {'cmd': 'date'})
51
        execution = self._wait_for_completion(execution)
52
        self._assert_success(execution, num_tasks=1)
53
        self.assertIn('stdout', execution.result)
54
55
    def test_complex_workbook_with_yaql(self):
56
        execution = self._execute_workflow(
57
            'examples.mistral-workbook-complex', {'vm_name': 'demo1'})
58
        execution = self._wait_for_completion(execution)
59
        self._assert_success(execution, num_tasks=8)
60
        self.assertIn('vm_id', execution.result)
61
62
    def test_complex_workbook_with_jinja(self):
63
        execution = self._execute_workflow(
64
            'examples.mistral-jinja-workbook-complex', {'vm_name': 'demo2'})
65
        execution = self._wait_for_completion(execution)
66
        self._assert_success(execution, num_tasks=8)
67
        self.assertIn('vm_id', execution.result)
68
69
    def test_complex_workbook_subflow_actions(self):
70
        execution = self._execute_workflow(
71
            'examples.mistral-workbook-subflows', {'subject': 'st2', 'adjective': 'cool'})
72
        execution = self._wait_for_completion(execution)
73
        self._assert_success(execution, num_tasks=2)
74
        self.assertIn('tagline', execution.result)
75
        self.assertEqual(execution.result['tagline'], 'st2 is cool!')
76
77
    def test_with_items(self):
78
        params = {'cmd': 'date', 'count': 8}
79
        execution = self._execute_workflow('examples.mistral-repeat', params)
80
        execution = self._wait_for_completion(execution)
81
        self._assert_success(execution, num_tasks=1)
82
        self.assertEqual(len(execution.result['result']), params['count'])
83
84
    def test_concurrent_load(self):
85
        wf_name = 'examples.mistral-workbook-complex'
86
        wf_params = {'vm_name': 'demo1'}
87
        executions = [self._execute_workflow(wf_name, wf_params) for i in range(3)]
88
89
        eventlet.sleep(30)
90
91
        for execution in executions:
92
            e = self._wait_for_completion(execution)
93
            self._assert_success(e, num_tasks=8)
94
            self.assertIn('vm_id', e.result)
95
96
    def test_execution_failure(self):
97
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'foo'})
98
        execution = self._wait_for_completion(execution)
99
        self._assert_failure(execution)
100
101
    def test_cancellation(self):
102
        execution = self._execute_workflow('examples.mistral-test-cancel', {'sleep': 10})
103
        execution = self._wait_for_state(execution, ['running'])
104
        self.st2client.liveactions.delete(execution)
105
106
        execution = self._wait_for_completion(
107
            execution,
108
            expect_tasks=False,
109
            expect_tasks_completed=False
110
        )
111
112
        self._assert_canceled(execution, are_tasks_completed=False)
113
114
    def test_cancellation_cascade_subworkflow_action(self):
115
        execution = self._execute_workflow(
116
            'examples.mistral-test-cancel-subworkflow-action',
117
            {'sleep': 30}
118
        )
119
120
        execution = self._wait_for_state(execution, ['running'])
121
        self.st2client.liveactions.delete(execution)
122
123
        execution = self._wait_for_completion(
124
            execution,
125
            expect_tasks=False,
126
            expect_tasks_completed=False
127
        )
128
129
        self.assertEqual(execution.status, 'canceled')
130
131
        task_executions = [e for e in self.st2client.liveactions.get_all()
132
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
133
134
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
135
136
        subworkflow_execution = self._wait_for_completion(
137
            subworkflow_execution,
138
            expect_tasks=False,
139
            expect_tasks_completed=False
140
        )
141
142
        self.assertEqual(execution.status, 'canceled')
143
144
    def test_task_cancellation(self):
145
        execution = self._execute_workflow('examples.mistral-test-cancel', {'sleep': 30})
146
        execution = self._wait_for_state(execution, ['running'])
147
148
        task_executions = [e for e in self.st2client.liveactions.get_all()
149
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
150
151
        self.assertGreater(len(task_executions), 0)
152
153
        self.st2client.liveactions.delete(task_executions[0])
154
        execution = self._wait_for_completion(execution, expect_tasks_completed=True)
155
        self._assert_canceled(execution, are_tasks_completed=True)
156
157
        task_results = execution.result.get('tasks', [])
158
        self.assertGreater(len(task_results), 0)
159
        self.assertEqual(task_results[0]['state'], 'CANCELLED')
160
161
    def test_basic_rerun(self):
162
        path = self.temp_dir_path
163
164
        with open(path, 'w') as f:
165
            f.write('1')
166
167
        execution = self._execute_workflow('examples.mistral-test-rerun', {'tempfile': path})
168
        execution = self._wait_for_completion(execution)
169
        self._assert_failure(execution)
170
        orig_st2_ex_id = execution.id
171
        orig_wf_ex_id = execution.context['mistral']['execution_id']
172
173
        with open(path, 'w') as f:
174
            f.write('0')
175
176
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id)
177
        self.assertNotEqual(execution.id, orig_st2_ex_id)
178
        execution = self._wait_for_completion(execution)
179
        self._assert_success(execution, num_tasks=1)
180
        self.assertNotEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
181
182
    def test_basic_rerun_task(self):
183
        path = self.temp_dir_path
184
185
        with open(path, 'w') as f:
186
            f.write('1')
187
188
        execution = self._execute_workflow('examples.mistral-test-rerun', {'tempfile': path})
189
        execution = self._wait_for_completion(execution)
190
        self._assert_failure(execution)
191
        orig_st2_ex_id = execution.id
192
        orig_wf_ex_id = execution.context['mistral']['execution_id']
193
194
        with open(path, 'w') as f:
195
            f.write('0')
196
197
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1'])
198
        self.assertNotEqual(execution.id, orig_st2_ex_id)
199
        execution = self._wait_for_completion(execution)
200
        self._assert_success(execution, num_tasks=1)
201
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
202
203
    def test_rerun_subflow_task(self):
204
        path = self.temp_dir_path
205
206
        with open(path, 'w') as f:
207
            f.write('1')
208
209
        workflow_name = 'examples.mistral-test-rerun-subflow'
210
        execution = self._execute_workflow(workflow_name, {'tempfile': path})
211
        execution = self._wait_for_completion(execution)
212
        self._assert_failure(execution)
213
        orig_st2_ex_id = execution.id
214
        orig_wf_ex_id = execution.context['mistral']['execution_id']
215
216
        with open(path, 'w') as f:
217
            f.write('0')
218
219
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1.task1'])
220
        self.assertNotEqual(execution.id, orig_st2_ex_id)
221
        execution = self._wait_for_completion(execution)
222
        self._assert_success(execution, num_tasks=1)
223
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
224
225
    def test_basic_rerun_and_reset_with_items_task(self):
226
        path = self.temp_dir_path
227
228
        with open(path, 'w') as f:
229
            f.write('1')
230
231
        execution = self._execute_workflow(
232
            'examples.mistral-test-rerun-with-items',
233
            {'tempfile': path}
234
        )
235
236
        execution = self._wait_for_completion(execution)
237
        self._assert_failure(execution)
238
        orig_st2_ex_id = execution.id
239
        orig_wf_ex_id = execution.context['mistral']['execution_id']
240
241
        with open(path, 'w') as f:
242
            f.write('0')
243
244
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1'])
245
        self.assertNotEqual(execution.id, orig_st2_ex_id)
246
247
        execution = self._wait_for_completion(execution)
248
        self._assert_success(execution, num_tasks=1)
249
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
250
251
        children = self.st2client.liveactions.get_property(execution.id, 'children')
252
        self.assertEqual(len(children), 4)
253
254
    def test_basic_rerun_and_resume_with_items_task(self):
255
        path = self.temp_dir_path
256
257
        with open(path, 'w') as f:
258
            f.write('1')
259
260
        execution = self._execute_workflow(
261
            'examples.mistral-test-rerun-with-items',
262
            {'tempfile': path}
263
        )
264
265
        execution = self._wait_for_completion(execution)
266
        self._assert_failure(execution)
267
        orig_st2_ex_id = execution.id
268
        orig_wf_ex_id = execution.context['mistral']['execution_id']
269
270
        with open(path, 'w') as f:
271
            f.write('0')
272
273
        execution = self.st2client.liveactions.re_run(
274
            orig_st2_ex_id,
275
            tasks=['task1'],
276
            no_reset=['task1']
277
        )
278
279
        self.assertNotEqual(execution.id, orig_st2_ex_id)
280
281
        execution = self._wait_for_completion(execution)
282
        self._assert_success(execution, num_tasks=1)
283
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
284
285
        children = self.st2client.liveactions.get_property(execution.id, 'children')
286
        self.assertEqual(len(children), 2)
287
288
    def test_rerun_subflow_and_reset_with_items_task(self):
289
        path = self.temp_dir_path
290
291
        with open(path, 'w') as f:
292
            f.write('1')
293
294
        execution = self._execute_workflow(
295
            'examples.mistral-test-rerun-subflow-with-items',
296
            {'tempfile': path}
297
        )
298
299
        execution = self._wait_for_completion(execution)
300
        self._assert_failure(execution)
301
        orig_st2_ex_id = execution.id
302
        orig_wf_ex_id = execution.context['mistral']['execution_id']
303
304
        with open(path, 'w') as f:
305
            f.write('0')
306
307
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1.task1'])
308
        self.assertNotEqual(execution.id, orig_st2_ex_id)
309
310
        execution = self._wait_for_completion(execution)
311
        self._assert_success(execution, num_tasks=1)
312
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
313
314
        children = self.st2client.liveactions.get_property(execution.id, 'children')
315
        self.assertEqual(len(children), 4)
316
317
    def test_rerun_subflow_and_resume_with_items_task(self):
318
        path = self.temp_dir_path
319
320
        with open(path, 'w') as f:
321
            f.write('1')
322
323
        execution = self._execute_workflow(
324
            'examples.mistral-test-rerun-subflow-with-items',
325
            {'tempfile': path}
326
        )
327
328
        execution = self._wait_for_completion(execution)
329
        self._assert_failure(execution)
330
        orig_st2_ex_id = execution.id
331
        orig_wf_ex_id = execution.context['mistral']['execution_id']
332
333
        with open(path, 'w') as f:
334
            f.write('0')
335
336
        execution = self.st2client.liveactions.re_run(
337
            orig_st2_ex_id,
338
            tasks=['task1.task1'],
339
            no_reset=['task1.task1']
340
        )
341
342
        self.assertNotEqual(execution.id, orig_st2_ex_id)
343
344
        execution = self._wait_for_completion(execution)
345
        self._assert_success(execution, num_tasks=1)
346
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
347
348
        children = self.st2client.liveactions.get_property(execution.id, 'children')
349
        self.assertEqual(len(children), 2)
350
351
    def test_invoke_from_action_chain(self):
352
        execution = self._execute_workflow('examples.invoke-mistral-with-jinja', {'cmd': 'date'})
353
        execution = self._wait_for_state(execution, ['succeeded'])
354