Test Setup Failed
Pull Request — master (#4154)
by W
03:24
created

WorkflowTestCase   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 90
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 90
rs 10
wmc 13

10 Methods

Rating   Name   Duplication   Size   Complexity  
A get_wf_fixture_meta_data() 0 10 1
A assert_task_not_started() 0 3 1
A get_wf_def() 0 6 2
A get_action_exs() 0 4 1
A get_action_ex() 0 4 1
A prep_wf_ex() 0 22 2
A assert_task_running() 0 4 1
A get_task_ex() 0 4 1
A run_workflow_step() 0 11 1
A assert_workflow_completed() 0 7 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
try:
18
    import simplejson as json
19
except ImportError:
20
    import json
21
22
import os
23
import os.path
24
import sys
25
import shutil
26
import logging
27
28
import six
29
import eventlet
30
import psutil
31
import mock
32
from oslo_config import cfg
33
from unittest2 import TestCase
34
import unittest2
35
36
from orchestra import conducting
37
from orchestra.specs import loader as specs_loader
38
from orchestra import states as wf_lib_states
39
40
from st2common.util.api import get_full_public_api_url
41
from st2common.constants import action as ac_const
42
from st2common.constants.runners import COMMON_ACTION_ENV_VARIABLES
43
from st2common.constants.system import AUTH_TOKEN_ENV_VARIABLE_NAME
44
from st2common.exceptions.db import StackStormDBObjectConflictError
45
from st2common.models.db import db_setup, db_teardown, db_ensure_indexes
46
from st2common.bootstrap.base import ResourceRegistrar
47
from st2common.bootstrap.configsregistrar import ConfigsRegistrar
48
from st2common.content.utils import get_packs_base_paths
49
from st2common.exceptions.db import StackStormDBObjectNotFoundError
50
import st2common.models.db.rule as rule_model
51
import st2common.models.db.rule_enforcement as rule_enforcement_model
52
import st2common.models.db.sensor as sensor_model
53
import st2common.models.db.trigger as trigger_model
54
import st2common.models.db.action as action_model
55
import st2common.models.db.keyvalue as keyvalue_model
56
import st2common.models.db.runner as runner_model
57
import st2common.models.db.execution as execution_model
58
import st2common.models.db.executionstate as executionstate_model
59
import st2common.models.db.liveaction as liveaction_model
60
import st2common.models.db.actionalias as actionalias_model
61
import st2common.models.db.policy as policy_model
62
from st2common.persistence import execution as ex_db_access
63
from st2common.persistence import workflow as wf_db_access
64
from st2common.services import workflows as wf_svc
65
from st2common.util import loader
66
import st2tests.config
67
68
# Imports for backward compatibility (those classes have been moved to standalone modules)
69
from st2tests.actions import BaseActionTestCase
70
from st2tests.sensors import BaseSensorTestCase
71
from st2tests.action_aliases import BaseActionAliasTestCase
72
73
74
__all__ = [
75
    'EventletTestCase',
76
    'DbTestCase',
77
    'DbModelTestCase',
78
    'CleanDbTestCase',
79
    'CleanFilesTestCase',
80
    'IntegrationTestCase',
81
    'RunnerTestCase',
82
    'WorkflowTestCase',
83
84
    # Pack test classes
85
    'BaseSensorTestCase',
86
    'BaseActionTestCase',
87
    'BaseActionAliasTestCase',
88
89
    'get_fixtures_path',
90
    'get_resources_path',
91
92
    'blocking_eventlet_spawn',
93
    'make_mock_stream_readline'
94
]
95
96
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
97
98
ALL_MODELS = []
99
ALL_MODELS.extend(rule_model.MODELS)
100
ALL_MODELS.extend(sensor_model.MODELS)
101
ALL_MODELS.extend(trigger_model.MODELS)
102
ALL_MODELS.extend(action_model.MODELS)
103
ALL_MODELS.extend(keyvalue_model.MODELS)
104
ALL_MODELS.extend(runner_model.MODELS)
105
ALL_MODELS.extend(execution_model.MODELS)
106
ALL_MODELS.extend(executionstate_model.MODELS)
107
ALL_MODELS.extend(liveaction_model.MODELS)
108
ALL_MODELS.extend(actionalias_model.MODELS)
109
ALL_MODELS.extend(policy_model.MODELS)
110
ALL_MODELS.extend(rule_enforcement_model.MODELS)
111
112
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
113
TESTS_CONFIG_PATH = os.path.join(BASE_DIR, '../conf/st2.conf')
114
115
116
class RunnerTestCase(unittest2.TestCase):
117
    def assertCommonSt2EnvVarsAvailableInEnv(self, env):
118
        """
119
        Method which asserts that the common ST2 environment variables are present in the provided
120
        environment.
121
        """
122
        for var_name in COMMON_ACTION_ENV_VARIABLES:
123
            self.assertTrue(var_name in env)
124
125
        self.assertEqual(env['ST2_ACTION_API_URL'], get_full_public_api_url())
126
        self.assertTrue(env[AUTH_TOKEN_ENV_VARIABLE_NAME] is not None)
127
128
129
class BaseTestCase(TestCase):
130
131
    @classmethod
132
    def _register_packs(self):
0 ignored issues
show
Coding Style Best Practice introduced by
The first argument of the class method _register_packs should be named cls.
Loading history...
133
        """
134
        Register all the packs inside the fixtures directory.
135
        """
136
137
        registrar = ResourceRegistrar(use_pack_cache=False)
138
        registrar.register_packs(base_dirs=get_packs_base_paths())
139
140
    @classmethod
141
    def _register_pack_configs(self, validate_configs=False):
0 ignored issues
show
Coding Style Best Practice introduced by
The first argument of the class method _register_pack_configs should be named cls.
Loading history...
142
        """
143
        Register all the packs inside the fixtures directory.
144
        """
145
        registrar = ConfigsRegistrar(use_pack_cache=False, validate_configs=validate_configs)
146
        registrar.register_from_packs(base_dirs=get_packs_base_paths())
147
148
149
class EventletTestCase(TestCase):
150
    """
151
    Base test class which performs eventlet monkey patching before the tests run
152
    and un-patching after the tests have finished running.
153
    """
154
155
    @classmethod
156
    def setUpClass(cls):
157
        eventlet.monkey_patch(
158
            os=True,
159
            select=True,
160
            socket=True,
161
            thread=False if '--use-debugger' in sys.argv else True,
162
            time=True
163
        )
164
165
    @classmethod
166
    def tearDownClass(cls):
167
        eventlet.monkey_patch(
168
            os=False,
169
            select=False,
170
            socket=False,
171
            thread=False,
172
            time=False
173
        )
174
175
176
class BaseDbTestCase(BaseTestCase):
177
178
    # Set to True to enable printing of all the log messages to the console
179
    DISPLAY_LOG_MESSAGES = False
180
181
    @classmethod
182
    def setUpClass(cls):
183
        st2tests.config.parse_args()
184
185
        if cls.DISPLAY_LOG_MESSAGES:
186
            config_path = os.path.join(BASE_DIR, '../conf/logging.conf')
187
            logging.config.fileConfig(config_path,
188
                                      disable_existing_loggers=False)
189
190
    @classmethod
191
    def _establish_connection_and_re_create_db(cls):
192
        username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
193
        password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
194
        cls.db_connection = db_setup(
195
            cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
196
            username=username, password=password, ensure_indexes=False)
197
        cls._drop_collections()
198
        cls.db_connection.drop_database(cfg.CONF.database.db_name)
199
200
        # Explicity ensure indexes after we re-create the DB otherwise ensure_indexes could failure
201
        # inside db_setup if test inserted invalid data
202
        db_ensure_indexes()
203
204
    @classmethod
205
    def _drop_db(cls):
206
        cls._drop_collections()
207
        if cls.db_connection is not None:
208
            cls.db_connection.drop_database(cfg.CONF.database.db_name)
209
        db_teardown()
210
        cls.db_connection = None
211
212
    @classmethod
213
    def _drop_collections(cls):
214
        # XXX: Explicitly drop all the collection. Otherwise, artifacts are left over in
215
        # subsequent tests.
216
        # See: https://github.com/MongoEngine/mongoengine/issues/566
217
        # See: https://github.com/MongoEngine/mongoengine/issues/565
218
        global ALL_MODELS
0 ignored issues
show
Unused Code introduced by
The variable ALL_MODELS was imported from global scope, but was never written to.
Loading history...
219
        for model in ALL_MODELS:
220
            model.drop_collection()
221
222
223
class DbTestCase(BaseDbTestCase):
224
    """
225
    This class drops and re-creates the database once per TestCase run.
226
227
    This means database is only dropped once before all the tests from this class run. This means
228
    data is persited between different tests in this class.
229
    """
230
231
    db_connection = None
232
    current_result = None
233
    register_packs = False
234
    register_pack_configs = False
235
236
    @classmethod
237
    def setUpClass(cls):
238
        BaseDbTestCase.setUpClass()
239
        cls._establish_connection_and_re_create_db()
240
241
        if cls.register_packs:
242
            cls._register_packs()
243
244
        if cls.register_pack_configs:
245
            cls._register_pack_configs()
246
247
    @classmethod
248
    def tearDownClass(cls):
249
        drop_db = True
250
251
        if cls.current_result.errors or cls.current_result.failures:
252
            # Don't drop DB on test failure
253
            drop_db = False
254
255
        if drop_db:
256
            cls._drop_db()
257
258
    def run(self, result=None):
259
        # Remember result for use in tearDown and tearDownClass
260
        self.current_result = result
261
        self.__class__.current_result = result
262
        super(DbTestCase, self).run(result=result)
263
264
265
class DbModelTestCase(DbTestCase):
266
    access_type = None
267
268
    @classmethod
269
    def setUpClass(cls):
270
        super(DbModelTestCase, cls).setUpClass()
271
        cls.db_type = cls.access_type.impl.model
272
273
    def _assert_fields_equal(self, a, b, exclude=None):
274
        exclude = exclude or []
275
        fields = {k: v for k, v in six.iteritems(self.db_type._fields) if k not in exclude}
276
277
        assert_funcs = {
278
            'mongoengine.fields.DictField': self.assertDictEqual,
279
            'mongoengine.fields.ListField': self.assertListEqual,
280
            'mongoengine.fields.SortedListField': self.assertListEqual
281
        }
282
283
        for k, v in six.iteritems(fields):
284
            assert_func = assert_funcs.get(str(v), self.assertEqual)
285
            assert_func(getattr(a, k, None), getattr(b, k, None))
286
287
    def _assert_values_equal(self, a, values=None):
288
        values = values or {}
289
290
        assert_funcs = {
291
            'dict': self.assertDictEqual,
292
            'list': self.assertListEqual
293
        }
294
295
        for k, v in six.iteritems(values):
296
            assert_func = assert_funcs.get(type(v).__name__, self.assertEqual)
297
            assert_func(getattr(a, k, None), v)
298
299
    def _assert_crud(self, instance, defaults=None, updates=None):
300
        # Assert instance is not already in the database.
301
        self.assertIsNone(getattr(instance, 'id', None))
302
303
        # Assert default values are assigned.
304
        self._assert_values_equal(instance, values=defaults)
305
306
        # Assert instance is created in the datbaase.
307
        saved = self.access_type.add_or_update(instance)
308
        self.assertIsNotNone(saved.id)
309
        self._assert_fields_equal(instance, saved, exclude=['id'])
310
        retrieved = self.access_type.get_by_id(saved.id)
311
        self._assert_fields_equal(saved, retrieved)
312
313
        # Assert instance is updated in the database.
314
        for k, v in six.iteritems(updates or {}):
315
            setattr(instance, k, v)
316
317
        updated = self.access_type.add_or_update(instance)
318
        self._assert_fields_equal(instance, updated)
319
320
        # Assert instance is deleted from the database.
321
        retrieved = self.access_type.get_by_id(instance.id)
322
        retrieved.delete()
323
        self.assertRaises(StackStormDBObjectNotFoundError,
324
                          self.access_type.get_by_id, instance.id)
325
326
    def _assert_unique_key_constraint(self, instance):
327
        # Assert instance is not already in the database.
328
        self.assertIsNone(getattr(instance, 'id', None))
329
330
        # Assert instance is created in the datbaase.
331
        saved = self.access_type.add_or_update(instance)
332
        self.assertIsNotNone(saved.id)
333
334
        # Assert exception is thrown if try to create same instance again.
335
        delattr(instance, 'id')
336
        self.assertRaises(StackStormDBObjectConflictError,
337
                          self.access_type.add_or_update,
338
                          instance)
339
340
341
class CleanDbTestCase(BaseDbTestCase):
342
    """
343
    Class which ensures database is re-created before running each test method.
344
345
    This means each test inside this class is self-sustained and starts with a clean (empty)
346
    database.
347
    """
348
349
    register_packs = False
350
    register_pack_configs = False
351
352
    def setUp(self):
353
        self._establish_connection_and_re_create_db()
354
355
        if self.register_packs:
356
            self._register_packs()
357
358
        if self.register_pack_configs:
359
            self._register_pack_configs()
360
361
362
class CleanFilesTestCase(TestCase):
363
    """
364
    Base test class which deletes specified files and directories on setUp and `tearDown.
365
    """
366
    to_delete_files = []
367
    to_delete_directories = []
368
369
    def setUp(self):
370
        super(CleanFilesTestCase, self).setUp()
371
        self._delete_files()
372
373
    def tearDown(self):
374
        super(CleanFilesTestCase, self).tearDown()
375
        self._delete_files()
376
377
    def _delete_files(self):
378
        for file_path in self.to_delete_files:
379
            if not os.path.isfile(file_path):
380
                continue
381
382
            try:
383
                os.remove(file_path)
384
            except Exception:
385
                pass
386
387
        for file_path in self.to_delete_directories:
388
            if not os.path.isdir(file_path):
389
                continue
390
391
            try:
392
                shutil.rmtree(file_path)
393
            except Exception:
394
                pass
395
396
397
class IntegrationTestCase(TestCase):
398
    """
399
    Base test class for integration tests to inherit from.
400
401
    It includes various utility functions and assert methods for working with processes.
402
    """
403
404
    # Set to True to print process stdout and stderr in tearDown after killing the processes
405
    # which are still alive
406
    print_stdout_stderr_on_teardown = False
407
408
    processes = {}
409
410
    def tearDown(self):
411
        super(IntegrationTestCase, self).tearDown()
412
413
        # Make sure we kill all the processes on teardown so they don't linger around if an
414
        # exception was thrown.
415
        for pid, process in self.processes.items():
416
417
            try:
418
                process.kill()
419
            except OSError:
420
                # Process already exited or similar
421
                pass
422
423
            if self.print_stdout_stderr_on_teardown:
424
                try:
425
                    stdout = process.stdout.read()
426
                except:
427
                    stdout = None
428
429
                try:
430
                    stderr = process.stderr.read()
431
                except:
432
                    stderr = None
433
434
                print('Process "%s"' % (process.pid))
435
                print('Stdout: %s' % (stdout))
436
                print('Stderr: %s' % (stderr))
437
438
    def add_process(self, process):
439
        """
440
        Add a process to the local data structure to make sure it will get killed and cleaned up on
441
        tearDown.
442
        """
443
        self.processes[process.pid] = process
444
445
    def remove_process(self, process):
446
        """
447
        Remove process from a local data structure.
448
        """
449
        if process.pid in self.processes:
450
            del self.processes[process.pid]
451
452
    def assertProcessIsRunning(self, process):
453
        """
454
        Assert that a long running process provided Process object as returned by subprocess.Popen
455
        has succesfuly started and is running.
456
        """
457
        if not process:
458
            raise ValueError('process is None')
459
460
        return_code = process.poll()
461
462
        if return_code is not None:
463
            if process.stdout:
464
                stdout = process.stdout.read()
465
            else:
466
                stdout = ''
467
468
            if process.stderr:
469
                stderr = process.stderr.read()
470
            else:
471
                stderr = ''
472
473
            msg = ('Process exited with code=%s.\nStdout:\n%s\n\nStderr:\n%s' %
474
                   (return_code, stdout, stderr))
475
            self.fail(msg)
476
477
    def assertProcessExited(self, proc):
478
        try:
479
            status = proc.status()
480
        except psutil.NoSuchProcess:
481
            status = 'exited'
482
483
        if status not in ['exited', 'zombie']:
484
            self.fail('Process with pid "%s" is still running' % (proc.pid))
485
486
487
class WorkflowTestCase(DbTestCase):
488
    """
489
    Base class for workflow service tests to inherit from.
490
    """
491
492
    def get_wf_fixture_meta_data(self, fixture_pack_path, wf_meta_file_name):
493
        wf_meta_file_path = fixture_pack_path + '/actions/' + wf_meta_file_name
494
        wf_meta_content = loader.load_meta_file(wf_meta_file_path)
495
        wf_name = wf_meta_content['pack'] + '.' + wf_meta_content['name']
496
497
        return {
498
            'file_name': wf_meta_file_name,
499
            'file_path': wf_meta_file_path,
500
            'content': wf_meta_content,
501
            'name': wf_name
502
        }
503
504
    def get_wf_def(self, test_pack_path, wf_meta):
505
        rel_wf_def_path = wf_meta['content']['entry_point']
506
        abs_wf_def_path = os.path.join(test_pack_path, 'actions', rel_wf_def_path)
507
508
        with open(abs_wf_def_path, 'r') as def_file:
509
            return def_file.read()
510
511
    def prep_wf_ex(self, wf_ex_db):
512
        data = {
513
            'spec': wf_ex_db.spec,
514
            'graph': wf_ex_db.graph,
515
            'state': wf_ex_db.status,
516
            'flow': wf_ex_db.flow,
517
            'input': wf_ex_db.input,
518
            'output': wf_ex_db.output,
519
            'errors': wf_ex_db.errors
520
        }
521
522
        conductor = conducting.WorkflowConductor.deserialize(data)
523
        conductor.set_workflow_state(wf_lib_states.RUNNING)
524
525
        for task in conductor.get_start_tasks():
526
            conductor.update_task_flow(task['id'], wf_lib_states.RUNNING)
527
528
        wf_ex_db.status = conductor.get_workflow_state()
529
        wf_ex_db.flow = conductor.flow.serialize()
530
        wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
531
532
        return wf_ex_db
533
534
    def get_task_ex(self, task_id):
535
        task_ex_dbs = wf_db_access.TaskExecution.query(task_id=task_id)
536
        self.assertGreater(len(task_ex_dbs), 0)
537
        return task_ex_dbs[0]
538
539
    def get_action_exs(self, task_ex_id):
540
        ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=task_ex_id)
541
        self.assertGreater(len(ac_ex_dbs), 0)
542
        return ac_ex_dbs
543
544
    def get_action_ex(self, task_ex_id):
545
        ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=task_ex_id)
546
        self.assertEqual(len(ac_ex_dbs), 1)
547
        return ac_ex_dbs[0]
548
549
    def run_workflow_step(self, wf_ex_db, task_id, ctx=None):
550
        spec_module = specs_loader.get_spec_module(wf_ex_db.spec['catalog'])
551
        wf_spec = spec_module.WorkflowSpec.deserialize(wf_ex_db.spec)
552
        task_spec = wf_spec.tasks.get_task(task_id)
553
        st2_ctx = {'execution_id': wf_ex_db.action_execution}
554
        task_ex_db = wf_svc.request_task_execution(wf_ex_db, task_id, task_spec, ctx or {}, st2_ctx)
555
        ac_ex_db = self.get_action_ex(str(task_ex_db.id))
556
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
557
        wf_svc.handle_action_execution_completion(ac_ex_db)
558
        task_ex_db = wf_db_access.TaskExecution.get_by_id(str(task_ex_db.id))
559
        self.assertEqual(task_ex_db.status, wf_lib_states.SUCCEEDED)
560
561
    def assert_task_not_started(self, task_id):
562
        task_ex_dbs = wf_db_access.TaskExecution.query(task_id=task_id)
563
        self.assertEqual(len(task_ex_dbs), 0)
564
565
    def assert_task_running(self, task_id):
566
        task_ex_db = self.get_task_ex(task_id)
567
        self.assertEqual(task_ex_db.task_id, task_id)
568
        self.assertEqual(task_ex_db.status, wf_lib_states.RUNNING)
569
570
    def assert_workflow_completed(self, wf_ex_id, state=None):
571
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_id)
572
        self.assertIn(wf_ex_db.status, wf_lib_states.COMPLETED_STATES)
573
574
        if state:
575
            self.assertIn(state, wf_lib_states.COMPLETED_STATES)
576
            self.assertEqual(wf_ex_db.status, state)
577
578
579
class FakeResponse(object):
580
581
    def __init__(self, text, status_code, reason):
582
        self.text = text
583
        self.status_code = status_code
584
        self.reason = reason
585
586
    def json(self):
587
        return json.loads(self.text)
588
589
    def raise_for_status(self):
590
        raise Exception(self.reason)
591
592
593
def get_fixtures_path():
594
    return os.path.join(os.path.dirname(__file__), 'fixtures')
595
596
597
def get_resources_path():
598
    return os.path.join(os.path.dirname(__file__), 'resources')
599
600
601
def blocking_eventlet_spawn(func, *args, **kwargs):
602
    func(*args, **kwargs)
603
    return mock.Mock()
604
605
606
# Utility function for mocking read_and_store_{stdout,stderr} functions
607
def make_mock_stream_readline(mock_stream, mock_data, stop_counter=1, sleep_delay=0):
608
    mock_stream.counter = 0
609
610
    def mock_stream_readline():
611
        if sleep_delay:
612
            eventlet.sleep(sleep_delay)
613
614
        if mock_stream.counter >= stop_counter:
615
            mock_stream.closed = True
616
            return
617
618
        line = mock_data[mock_stream.counter]
619
        mock_stream.counter += 1
620
        return line
621
622
    return mock_stream_readline
623