Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2tests/st2tests/base.py (2 issues)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
from __future__ import print_function
18
19
try:
20
    import simplejson as json
21
except ImportError:
22
    import json
23
24
import os
25
import os.path
26
import sys
27
import shutil
28
import logging
29
30
import six
31
import eventlet
32
import psutil
33
import mock
34
from oslo_config import cfg
35
from unittest2 import TestCase
36
import unittest2
37
38
from orquesta import conducting
39
from orquesta import events
40
from orquesta.specs import loader as specs_loader
41
from orquesta import states as wf_lib_states
42
43
from st2common.util.api import get_full_public_api_url
44
from st2common.constants import action as ac_const
45
from st2common.constants.runners import COMMON_ACTION_ENV_VARIABLES
46
from st2common.constants.system import AUTH_TOKEN_ENV_VARIABLE_NAME
47
from st2common.exceptions.db import StackStormDBObjectConflictError
48
from st2common.models.db import db_setup, db_teardown, db_ensure_indexes
49
from st2common.bootstrap.base import ResourceRegistrar
50
from st2common.bootstrap.configsregistrar import ConfigsRegistrar
51
from st2common.content.utils import get_packs_base_paths
52
from st2common.exceptions.db import StackStormDBObjectNotFoundError
53
import st2common.models.db.rule as rule_model
54
import st2common.models.db.rule_enforcement as rule_enforcement_model
55
import st2common.models.db.sensor as sensor_model
56
import st2common.models.db.trigger as trigger_model
57
import st2common.models.db.action as action_model
58
import st2common.models.db.keyvalue as keyvalue_model
59
import st2common.models.db.runner as runner_model
60
import st2common.models.db.execution as execution_model
61
import st2common.models.db.executionstate as executionstate_model
62
import st2common.models.db.liveaction as liveaction_model
63
import st2common.models.db.actionalias as actionalias_model
64
import st2common.models.db.policy as policy_model
65
from st2common.persistence import execution as ex_db_access
66
from st2common.persistence import workflow as wf_db_access
67
from st2common.services import workflows as wf_svc
68
from st2common.util import api as api_util
69
from st2common.util import loader
70
import st2tests.config
71
72
# Imports for backward compatibility (those classes have been moved to standalone modules)
73
from st2tests.actions import BaseActionTestCase
74
from st2tests.sensors import BaseSensorTestCase
75
from st2tests.action_aliases import BaseActionAliasTestCase
76
77
78
__all__ = [
79
    'EventletTestCase',
80
    'DbTestCase',
81
    'DbModelTestCase',
82
    'CleanDbTestCase',
83
    'CleanFilesTestCase',
84
    'IntegrationTestCase',
85
    'RunnerTestCase',
86
    'WorkflowTestCase',
87
88
    # Pack test classes
89
    'BaseSensorTestCase',
90
    'BaseActionTestCase',
91
    'BaseActionAliasTestCase',
92
93
    'get_fixtures_path',
94
    'get_resources_path',
95
96
    'blocking_eventlet_spawn',
97
    'make_mock_stream_readline'
98
]
99
100
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
101
102
ALL_MODELS = []
103
ALL_MODELS.extend(rule_model.MODELS)
104
ALL_MODELS.extend(sensor_model.MODELS)
105
ALL_MODELS.extend(trigger_model.MODELS)
106
ALL_MODELS.extend(action_model.MODELS)
107
ALL_MODELS.extend(keyvalue_model.MODELS)
108
ALL_MODELS.extend(runner_model.MODELS)
109
ALL_MODELS.extend(execution_model.MODELS)
110
ALL_MODELS.extend(executionstate_model.MODELS)
111
ALL_MODELS.extend(liveaction_model.MODELS)
112
ALL_MODELS.extend(actionalias_model.MODELS)
113
ALL_MODELS.extend(policy_model.MODELS)
114
ALL_MODELS.extend(rule_enforcement_model.MODELS)
115
116
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
117
TESTS_CONFIG_PATH = os.path.join(BASE_DIR, '../conf/st2.conf')
118
119
120
class RunnerTestCase(unittest2.TestCase):
121
    def assertCommonSt2EnvVarsAvailableInEnv(self, env):
122
        """
123
        Method which asserts that the common ST2 environment variables are present in the provided
124
        environment.
125
        """
126
        for var_name in COMMON_ACTION_ENV_VARIABLES:
127
            self.assertTrue(var_name in env)
128
129
        self.assertEqual(env['ST2_ACTION_API_URL'], get_full_public_api_url())
130
        self.assertTrue(env[AUTH_TOKEN_ENV_VARIABLE_NAME] is not None)
131
132
133
class BaseTestCase(TestCase):
134
135
    @classmethod
136
    def _register_packs(self):
0 ignored issues
show
Coding Style Best Practice introduced by
The first argument of the class method _register_packs should be named cls.
Loading history...
137
        """
138
        Register all the packs inside the fixtures directory.
139
        """
140
141
        registrar = ResourceRegistrar(use_pack_cache=False)
142
        registrar.register_packs(base_dirs=get_packs_base_paths())
143
144
    @classmethod
145
    def _register_pack_configs(self, validate_configs=False):
0 ignored issues
show
Coding Style Best Practice introduced by
The first argument of the class method _register_pack_configs should be named cls.
Loading history...
146
        """
147
        Register all the packs inside the fixtures directory.
148
        """
149
        registrar = ConfigsRegistrar(use_pack_cache=False, validate_configs=validate_configs)
150
        registrar.register_from_packs(base_dirs=get_packs_base_paths())
151
152
153
class EventletTestCase(TestCase):
154
    """
155
    Base test class which performs eventlet monkey patching before the tests run
156
    and un-patching after the tests have finished running.
157
    """
158
159
    @classmethod
160
    def setUpClass(cls):
161
        eventlet.monkey_patch(
162
            os=True,
163
            select=True,
164
            socket=True,
165
            thread=False if '--use-debugger' in sys.argv else True,
166
            time=True
167
        )
168
169
    @classmethod
170
    def tearDownClass(cls):
171
        eventlet.monkey_patch(
172
            os=False,
173
            select=False,
174
            socket=False,
175
            thread=False,
176
            time=False
177
        )
178
179
180
class BaseDbTestCase(BaseTestCase):
181
    # True to synchronously ensure indexes after db_setup is called - NOTE: This is only needed
182
    # with older MongoDB versions. With recent versions this is not needed for the tests anymore
183
    # and offers significant test speeds ups.
184
    ensure_indexes = False
185
186
    # A list of models to ensure indexes for when ensure_indexes is True. If not specified, it
187
    # defaults to all the models
188
    ensure_indexes_models = []
189
190
    # Set to True to enable printing of all the log messages to the console
191
    DISPLAY_LOG_MESSAGES = False
192
193
    @classmethod
194
    def setUpClass(cls):
195
        st2tests.config.parse_args()
196
197
        if cls.DISPLAY_LOG_MESSAGES:
198
            config_path = os.path.join(BASE_DIR, '../conf/logging.conf')
199
            logging.config.fileConfig(config_path,
200
                                      disable_existing_loggers=False)
201
202
    @classmethod
203
    def _establish_connection_and_re_create_db(cls):
204
        username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
205
        password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
206
        cls.db_connection = db_setup(
207
            cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
208
            username=username, password=password, ensure_indexes=False)
209
210
        cls._drop_collections()
211
        cls.db_connection.drop_database(cfg.CONF.database.db_name)
212
213
        # Explicitly ensure indexes after we re-create the DB otherwise ensure_indexes could failure
214
        # inside db_setup if test inserted invalid data.
215
        # NOTE: This is only needed in distributed scenarios (production deployments) where
216
        # multiple services can start up at the same time and race conditions are possible.
217
        if cls.ensure_indexes:
218
            if len(cls.ensure_indexes_models) == 0 or len(cls.ensure_indexes_models) > 1:
219
                msg = ('Ensuring indexes for all the models, this could significantly slow down '
220
                       'the tests')
221
                print('#' * len(msg), file=sys.stderr)
222
                print(msg, file=sys.stderr)
223
                print('#' * len(msg), file=sys.stderr)
224
225
            db_ensure_indexes(cls.ensure_indexes_models)
226
227
    @classmethod
228
    def _drop_db(cls):
229
        cls._drop_collections()
230
231
        if cls.db_connection is not None:
232
            cls.db_connection.drop_database(cfg.CONF.database.db_name)
233
234
        db_teardown()
235
        cls.db_connection = None
236
237
    @classmethod
238
    def _drop_collections(cls):
239
        # XXX: Explicitly drop all the collections. Otherwise, artifacts are left over in
240
        # subsequent tests.
241
        # See: https://github.com/MongoEngine/mongoengine/issues/566
242
        # See: https://github.com/MongoEngine/mongoengine/issues/565
243
244
        # NOTE: In older MongoDB versions you needed to drop all the collections prior to dropping
245
        # the database - that's not needed anymore with the WiredTiger engine
246
247
        # for model in ALL_MODELS:
248
        #     model.drop_collection()
249
        return
250
251
252
class DbTestCase(BaseDbTestCase):
253
    """
254
    This class drops and re-creates the database once per TestCase run.
255
256
    This means database is only dropped once before all the tests from this class run. This means
257
    data is persited between different tests in this class.
258
    """
259
260
    db_connection = None
261
    current_result = None
262
    register_packs = False
263
    register_pack_configs = False
264
265
    @classmethod
266
    def setUpClass(cls):
267
        BaseDbTestCase.setUpClass()
268
        cls._establish_connection_and_re_create_db()
269
270
        if cls.register_packs:
271
            cls._register_packs()
272
273
        if cls.register_pack_configs:
274
            cls._register_pack_configs()
275
276
    @classmethod
277
    def tearDownClass(cls):
278
        drop_db = True
279
280
        if cls.current_result.errors or cls.current_result.failures:
281
            # Don't drop DB on test failure
282
            drop_db = False
283
284
        if drop_db:
285
            cls._drop_db()
286
287
    def run(self, result=None):
288
        # Remember result for use in tearDown and tearDownClass
289
        self.current_result = result
290
        self.__class__.current_result = result
291
        super(DbTestCase, self).run(result=result)
292
293
294
class DbModelTestCase(DbTestCase):
295
    access_type = None
296
297
    @classmethod
298
    def setUpClass(cls):
299
        super(DbModelTestCase, cls).setUpClass()
300
        cls.db_type = cls.access_type.impl.model
301
302
    def _assert_fields_equal(self, a, b, exclude=None):
303
        exclude = exclude or []
304
        fields = {k: v for k, v in six.iteritems(self.db_type._fields) if k not in exclude}
305
306
        assert_funcs = {
307
            'mongoengine.fields.DictField': self.assertDictEqual,
308
            'mongoengine.fields.ListField': self.assertListEqual,
309
            'mongoengine.fields.SortedListField': self.assertListEqual
310
        }
311
312
        for k, v in six.iteritems(fields):
313
            assert_func = assert_funcs.get(str(v), self.assertEqual)
314
            assert_func(getattr(a, k, None), getattr(b, k, None))
315
316
    def _assert_values_equal(self, a, values=None):
317
        values = values or {}
318
319
        assert_funcs = {
320
            'dict': self.assertDictEqual,
321
            'list': self.assertListEqual
322
        }
323
324
        for k, v in six.iteritems(values):
325
            assert_func = assert_funcs.get(type(v).__name__, self.assertEqual)
326
            assert_func(getattr(a, k, None), v)
327
328
    def _assert_crud(self, instance, defaults=None, updates=None):
329
        # Assert instance is not already in the database.
330
        self.assertIsNone(getattr(instance, 'id', None))
331
332
        # Assert default values are assigned.
333
        self._assert_values_equal(instance, values=defaults)
334
335
        # Assert instance is created in the datbaase.
336
        saved = self.access_type.add_or_update(instance)
337
        self.assertIsNotNone(saved.id)
338
        self._assert_fields_equal(instance, saved, exclude=['id'])
339
        retrieved = self.access_type.get_by_id(saved.id)
340
        self._assert_fields_equal(saved, retrieved)
341
342
        # Assert instance is updated in the database.
343
        for k, v in six.iteritems(updates or {}):
344
            setattr(instance, k, v)
345
346
        updated = self.access_type.add_or_update(instance)
347
        self._assert_fields_equal(instance, updated)
348
349
        # Assert instance is deleted from the database.
350
        retrieved = self.access_type.get_by_id(instance.id)
351
        retrieved.delete()
352
        self.assertRaises(StackStormDBObjectNotFoundError,
353
                          self.access_type.get_by_id, instance.id)
354
355
    def _assert_unique_key_constraint(self, instance):
356
        # Assert instance is not already in the database.
357
        self.assertIsNone(getattr(instance, 'id', None))
358
359
        # Assert instance is created in the datbaase.
360
        saved = self.access_type.add_or_update(instance)
361
        self.assertIsNotNone(saved.id)
362
363
        # Assert exception is thrown if try to create same instance again.
364
        delattr(instance, 'id')
365
        self.assertRaises(StackStormDBObjectConflictError,
366
                          self.access_type.add_or_update,
367
                          instance)
368
369
370
class CleanDbTestCase(BaseDbTestCase):
371
    """
372
    Class which ensures database is re-created before running each test method.
373
374
    This means each test inside this class is self-sustained and starts with a clean (empty)
375
    database.
376
    """
377
378
    register_packs = False
379
    register_pack_configs = False
380
381
    def setUp(self):
382
        self._establish_connection_and_re_create_db()
383
384
        if self.register_packs:
385
            self._register_packs()
386
387
        if self.register_pack_configs:
388
            self._register_pack_configs()
389
390
391
class CleanFilesTestCase(TestCase):
392
    """
393
    Base test class which deletes specified files and directories on setUp and `tearDown.
394
    """
395
    to_delete_files = []
396
    to_delete_directories = []
397
398
    def setUp(self):
399
        super(CleanFilesTestCase, self).setUp()
400
        self._delete_files()
401
402
    def tearDown(self):
403
        super(CleanFilesTestCase, self).tearDown()
404
        self._delete_files()
405
406
    def _delete_files(self):
407
        for file_path in self.to_delete_files:
408
            if not os.path.isfile(file_path):
409
                continue
410
411
            try:
412
                os.remove(file_path)
413
            except Exception:
414
                pass
415
416
        for file_path in self.to_delete_directories:
417
            if not os.path.isdir(file_path):
418
                continue
419
420
            try:
421
                shutil.rmtree(file_path)
422
            except Exception:
423
                pass
424
425
426
class IntegrationTestCase(TestCase):
427
    """
428
    Base test class for integration tests to inherit from.
429
430
    It includes various utility functions and assert methods for working with processes.
431
    """
432
433
    # Set to True to print process stdout and stderr in tearDown after killing the processes
434
    # which are still alive
435
    print_stdout_stderr_on_teardown = False
436
437
    processes = {}
438
439
    def tearDown(self):
440
        super(IntegrationTestCase, self).tearDown()
441
442
        # Make sure we kill all the processes on teardown so they don't linger around if an
443
        # exception was thrown.
444
        for pid, process in self.processes.items():
445
446
            try:
447
                process.kill()
448
            except OSError:
449
                # Process already exited or similar
450
                pass
451
452
            if self.print_stdout_stderr_on_teardown:
453
                try:
454
                    stdout = process.stdout.read()
455
                except:
456
                    stdout = None
457
458
                try:
459
                    stderr = process.stderr.read()
460
                except:
461
                    stderr = None
462
463
                print('Process "%s"' % (process.pid))
464
                print('Stdout: %s' % (stdout))
465
                print('Stderr: %s' % (stderr))
466
467
    def add_process(self, process):
468
        """
469
        Add a process to the local data structure to make sure it will get killed and cleaned up on
470
        tearDown.
471
        """
472
        self.processes[process.pid] = process
473
474
    def remove_process(self, process):
475
        """
476
        Remove process from a local data structure.
477
        """
478
        if process.pid in self.processes:
479
            del self.processes[process.pid]
480
481
    def assertProcessIsRunning(self, process):
482
        """
483
        Assert that a long running process provided Process object as returned by subprocess.Popen
484
        has succesfuly started and is running.
485
        """
486
        if not process:
487
            raise ValueError('process is None')
488
489
        return_code = process.poll()
490
491
        if return_code is not None:
492
            if process.stdout:
493
                stdout = process.stdout.read()
494
            else:
495
                stdout = ''
496
497
            if process.stderr:
498
                stderr = process.stderr.read()
499
            else:
500
                stderr = ''
501
502
            msg = ('Process exited with code=%s.\nStdout:\n%s\n\nStderr:\n%s' %
503
                   (return_code, stdout, stderr))
504
            self.fail(msg)
505
506
    def assertProcessExited(self, proc):
507
        try:
508
            status = proc.status()
509
        except psutil.NoSuchProcess:
510
            status = 'exited'
511
512
        if status not in ['exited', 'zombie']:
513
            self.fail('Process with pid "%s" is still running' % (proc.pid))
514
515
516
class WorkflowTestCase(DbTestCase):
517
    """
518
    Base class for workflow service tests to inherit from.
519
    """
520
521
    def get_wf_fixture_meta_data(self, fixture_pack_path, wf_meta_file_name):
522
        wf_meta_file_path = fixture_pack_path + '/actions/' + wf_meta_file_name
523
        wf_meta_content = loader.load_meta_file(wf_meta_file_path)
524
        wf_name = wf_meta_content['pack'] + '.' + wf_meta_content['name']
525
526
        return {
527
            'file_name': wf_meta_file_name,
528
            'file_path': wf_meta_file_path,
529
            'content': wf_meta_content,
530
            'name': wf_name
531
        }
532
533
    def get_wf_def(self, test_pack_path, wf_meta):
534
        rel_wf_def_path = wf_meta['content']['entry_point']
535
        abs_wf_def_path = os.path.join(test_pack_path, 'actions', rel_wf_def_path)
536
537
        with open(abs_wf_def_path, 'r') as def_file:
538
            return def_file.read()
539
540
    def mock_st2_context(self, ac_ex_db, context=None):
541
        st2_ctx = {
542
            'st2': {
543
                'api_url': api_util.get_full_public_api_url(),
544
                'action_execution_id': str(ac_ex_db.id)
545
            }
546
        }
547
548
        if context:
549
            st2_ctx['parent'] = context
550
551
        return st2_ctx
552
553
    def prep_wf_ex(self, wf_ex_db):
554
        data = {
555
            'spec': wf_ex_db.spec,
556
            'graph': wf_ex_db.graph,
557
            'state': wf_ex_db.status,
558
            'flow': wf_ex_db.flow,
559
            'context': wf_ex_db.context,
560
            'input': wf_ex_db.input,
561
            'output': wf_ex_db.output,
562
            'errors': wf_ex_db.errors
563
        }
564
565
        conductor = conducting.WorkflowConductor.deserialize(data)
566
        conductor.request_workflow_state(wf_lib_states.RUNNING)
567
568
        for task in conductor.get_start_tasks():
569
            ac_ex_event = events.ActionExecutionEvent(wf_lib_states.RUNNING)
570
            conductor.update_task_flow(task['id'], ac_ex_event)
571
572
        wf_ex_db.status = conductor.get_workflow_state()
573
        wf_ex_db.flow = conductor.flow.serialize()
574
        wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
575
576
        return wf_ex_db
577
578
    def get_task_ex(self, task_id):
579
        task_ex_dbs = wf_db_access.TaskExecution.query(task_id=task_id)
580
        self.assertGreater(len(task_ex_dbs), 0)
581
        return task_ex_dbs[0]
582
583
    def get_action_exs(self, task_ex_id):
584
        ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=task_ex_id)
585
        self.assertGreater(len(ac_ex_dbs), 0)
586
        return ac_ex_dbs
587
588
    def get_action_ex(self, task_ex_id):
589
        ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=task_ex_id)
590
        self.assertEqual(len(ac_ex_dbs), 1)
591
        return ac_ex_dbs[0]
592
593
    def run_workflow_step(self, wf_ex_db, task_id, ctx=None):
594
        spec_module = specs_loader.get_spec_module(wf_ex_db.spec['catalog'])
595
        wf_spec = spec_module.WorkflowSpec.deserialize(wf_ex_db.spec)
596
        task_spec = wf_spec.tasks.get_task(task_id)
597
        st2_ctx = {'execution_id': wf_ex_db.action_execution}
598
        task_ex_db = wf_svc.request_task_execution(wf_ex_db, task_id, task_spec, ctx or {}, st2_ctx)
599
        ac_ex_db = self.get_action_ex(str(task_ex_db.id))
600
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
601
        wf_svc.handle_action_execution_completion(ac_ex_db)
602
        task_ex_db = wf_db_access.TaskExecution.get_by_id(str(task_ex_db.id))
603
        self.assertEqual(task_ex_db.status, wf_lib_states.SUCCEEDED)
604
605
    def assert_task_not_started(self, task_id):
606
        task_ex_dbs = wf_db_access.TaskExecution.query(task_id=task_id)
607
        self.assertEqual(len(task_ex_dbs), 0)
608
609
    def assert_task_running(self, task_id):
610
        task_ex_db = self.get_task_ex(task_id)
611
        self.assertEqual(task_ex_db.task_id, task_id)
612
        self.assertEqual(task_ex_db.status, wf_lib_states.RUNNING)
613
614
    def assert_workflow_completed(self, wf_ex_id, state=None):
615
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_id)
616
        self.assertIn(wf_ex_db.status, wf_lib_states.COMPLETED_STATES)
617
618
        if state:
619
            self.assertIn(state, wf_lib_states.COMPLETED_STATES)
620
            self.assertEqual(wf_ex_db.status, state)
621
622
623
class FakeResponse(object):
624
625
    def __init__(self, text, status_code, reason):
626
        self.text = text
627
        self.status_code = status_code
628
        self.reason = reason
629
630
    def json(self):
631
        return json.loads(self.text)
632
633
    def raise_for_status(self):
634
        raise Exception(self.reason)
635
636
637
def get_fixtures_path():
638
    return os.path.join(os.path.dirname(__file__), 'fixtures')
639
640
641
def get_resources_path():
642
    return os.path.join(os.path.dirname(__file__), 'resources')
643
644
645
def blocking_eventlet_spawn(func, *args, **kwargs):
646
    func(*args, **kwargs)
647
    return mock.Mock()
648
649
650
# Utility function for mocking read_and_store_{stdout,stderr} functions
651
def make_mock_stream_readline(mock_stream, mock_data, stop_counter=1, sleep_delay=0):
652
    mock_stream.counter = 0
653
654
    def mock_stream_readline():
655
        if sleep_delay:
656
            eventlet.sleep(sleep_delay)
657
658
        if mock_stream.counter >= stop_counter:
659
            mock_stream.closed = True
660
            return
661
662
        line = mock_data[mock_stream.counter]
663
        mock_stream.counter += 1
664
        return line
665
666
    return mock_stream_readline
667