Test Failed
Push — master ( a52b43...69f2d4 )
by W
01:58
created

WorkflowTestCase.mock_st2_context()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 12
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
try:
18
    import simplejson as json
19
except ImportError:
20
    import json
21
22
import os
23
import os.path
24
import sys
25
import shutil
26
import logging
27
28
import six
29
import eventlet
30
import psutil
31
import mock
32
from oslo_config import cfg
33
from unittest2 import TestCase
34
import unittest2
35
36
from orchestra import conducting
37
from orchestra.specs import loader as specs_loader
38
from orchestra import states as wf_lib_states
39
40
from st2common.util.api import get_full_public_api_url
41
from st2common.constants import action as ac_const
42
from st2common.constants.runners import COMMON_ACTION_ENV_VARIABLES
43
from st2common.constants.system import AUTH_TOKEN_ENV_VARIABLE_NAME
44
from st2common.exceptions.db import StackStormDBObjectConflictError
45
from st2common.models.db import db_setup, db_teardown, db_ensure_indexes
46
from st2common.bootstrap.base import ResourceRegistrar
47
from st2common.bootstrap.configsregistrar import ConfigsRegistrar
48
from st2common.content.utils import get_packs_base_paths
49
from st2common.exceptions.db import StackStormDBObjectNotFoundError
50
import st2common.models.db.rule as rule_model
51
import st2common.models.db.rule_enforcement as rule_enforcement_model
52
import st2common.models.db.sensor as sensor_model
53
import st2common.models.db.trigger as trigger_model
54
import st2common.models.db.action as action_model
55
import st2common.models.db.keyvalue as keyvalue_model
56
import st2common.models.db.runner as runner_model
57
import st2common.models.db.execution as execution_model
58
import st2common.models.db.executionstate as executionstate_model
59
import st2common.models.db.liveaction as liveaction_model
60
import st2common.models.db.actionalias as actionalias_model
61
import st2common.models.db.policy as policy_model
62
from st2common.persistence import execution as ex_db_access
63
from st2common.persistence import workflow as wf_db_access
64
from st2common.services import workflows as wf_svc
65
from st2common.util import api as api_util
66
from st2common.util import loader
67
import st2tests.config
68
69
# Imports for backward compatibility (those classes have been moved to standalone modules)
70
from st2tests.actions import BaseActionTestCase
71
from st2tests.sensors import BaseSensorTestCase
72
from st2tests.action_aliases import BaseActionAliasTestCase
73
74
75
__all__ = [
76
    'EventletTestCase',
77
    'DbTestCase',
78
    'DbModelTestCase',
79
    'CleanDbTestCase',
80
    'CleanFilesTestCase',
81
    'IntegrationTestCase',
82
    'RunnerTestCase',
83
    'WorkflowTestCase',
84
85
    # Pack test classes
86
    'BaseSensorTestCase',
87
    'BaseActionTestCase',
88
    'BaseActionAliasTestCase',
89
90
    'get_fixtures_path',
91
    'get_resources_path',
92
93
    'blocking_eventlet_spawn',
94
    'make_mock_stream_readline'
95
]
96
97
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
98
99
ALL_MODELS = []
100
ALL_MODELS.extend(rule_model.MODELS)
101
ALL_MODELS.extend(sensor_model.MODELS)
102
ALL_MODELS.extend(trigger_model.MODELS)
103
ALL_MODELS.extend(action_model.MODELS)
104
ALL_MODELS.extend(keyvalue_model.MODELS)
105
ALL_MODELS.extend(runner_model.MODELS)
106
ALL_MODELS.extend(execution_model.MODELS)
107
ALL_MODELS.extend(executionstate_model.MODELS)
108
ALL_MODELS.extend(liveaction_model.MODELS)
109
ALL_MODELS.extend(actionalias_model.MODELS)
110
ALL_MODELS.extend(policy_model.MODELS)
111
ALL_MODELS.extend(rule_enforcement_model.MODELS)
112
113
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
114
TESTS_CONFIG_PATH = os.path.join(BASE_DIR, '../conf/st2.conf')
115
116
117
class RunnerTestCase(unittest2.TestCase):
118
    def assertCommonSt2EnvVarsAvailableInEnv(self, env):
119
        """
120
        Method which asserts that the common ST2 environment variables are present in the provided
121
        environment.
122
        """
123
        for var_name in COMMON_ACTION_ENV_VARIABLES:
124
            self.assertTrue(var_name in env)
125
126
        self.assertEqual(env['ST2_ACTION_API_URL'], get_full_public_api_url())
127
        self.assertTrue(env[AUTH_TOKEN_ENV_VARIABLE_NAME] is not None)
128
129
130
class BaseTestCase(TestCase):
131
132
    @classmethod
133
    def _register_packs(self):
0 ignored issues
show
Coding Style Best Practice introduced by
The first argument of the class method _register_packs should be named cls.
Loading history...
134
        """
135
        Register all the packs inside the fixtures directory.
136
        """
137
138
        registrar = ResourceRegistrar(use_pack_cache=False)
139
        registrar.register_packs(base_dirs=get_packs_base_paths())
140
141
    @classmethod
142
    def _register_pack_configs(self, validate_configs=False):
0 ignored issues
show
Coding Style Best Practice introduced by
The first argument of the class method _register_pack_configs should be named cls.
Loading history...
143
        """
144
        Register all the packs inside the fixtures directory.
145
        """
146
        registrar = ConfigsRegistrar(use_pack_cache=False, validate_configs=validate_configs)
147
        registrar.register_from_packs(base_dirs=get_packs_base_paths())
148
149
150
class EventletTestCase(TestCase):
151
    """
152
    Base test class which performs eventlet monkey patching before the tests run
153
    and un-patching after the tests have finished running.
154
    """
155
156
    @classmethod
157
    def setUpClass(cls):
158
        eventlet.monkey_patch(
159
            os=True,
160
            select=True,
161
            socket=True,
162
            thread=False if '--use-debugger' in sys.argv else True,
163
            time=True
164
        )
165
166
    @classmethod
167
    def tearDownClass(cls):
168
        eventlet.monkey_patch(
169
            os=False,
170
            select=False,
171
            socket=False,
172
            thread=False,
173
            time=False
174
        )
175
176
177
class BaseDbTestCase(BaseTestCase):
178
179
    # Set to True to enable printing of all the log messages to the console
180
    DISPLAY_LOG_MESSAGES = False
181
182
    @classmethod
183
    def setUpClass(cls):
184
        st2tests.config.parse_args()
185
186
        if cls.DISPLAY_LOG_MESSAGES:
187
            config_path = os.path.join(BASE_DIR, '../conf/logging.conf')
188
            logging.config.fileConfig(config_path,
189
                                      disable_existing_loggers=False)
190
191
    @classmethod
192
    def _establish_connection_and_re_create_db(cls):
193
        username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
194
        password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
195
        cls.db_connection = db_setup(
196
            cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
197
            username=username, password=password, ensure_indexes=False)
198
        cls._drop_collections()
199
        cls.db_connection.drop_database(cfg.CONF.database.db_name)
200
201
        # Explicity ensure indexes after we re-create the DB otherwise ensure_indexes could failure
202
        # inside db_setup if test inserted invalid data
203
        db_ensure_indexes()
204
205
    @classmethod
206
    def _drop_db(cls):
207
        cls._drop_collections()
208
        if cls.db_connection is not None:
209
            cls.db_connection.drop_database(cfg.CONF.database.db_name)
210
        db_teardown()
211
        cls.db_connection = None
212
213
    @classmethod
214
    def _drop_collections(cls):
215
        # XXX: Explicitly drop all the collection. Otherwise, artifacts are left over in
216
        # subsequent tests.
217
        # See: https://github.com/MongoEngine/mongoengine/issues/566
218
        # See: https://github.com/MongoEngine/mongoengine/issues/565
219
        global ALL_MODELS
0 ignored issues
show
Unused Code introduced by
The variable ALL_MODELS was imported from global scope, but was never written to.
Loading history...
220
        for model in ALL_MODELS:
221
            model.drop_collection()
222
223
224
class DbTestCase(BaseDbTestCase):
225
    """
226
    This class drops and re-creates the database once per TestCase run.
227
228
    This means database is only dropped once before all the tests from this class run. This means
229
    data is persited between different tests in this class.
230
    """
231
232
    db_connection = None
233
    current_result = None
234
    register_packs = False
235
    register_pack_configs = False
236
237
    @classmethod
238
    def setUpClass(cls):
239
        BaseDbTestCase.setUpClass()
240
        cls._establish_connection_and_re_create_db()
241
242
        if cls.register_packs:
243
            cls._register_packs()
244
245
        if cls.register_pack_configs:
246
            cls._register_pack_configs()
247
248
    @classmethod
249
    def tearDownClass(cls):
250
        drop_db = True
251
252
        if cls.current_result.errors or cls.current_result.failures:
253
            # Don't drop DB on test failure
254
            drop_db = False
255
256
        if drop_db:
257
            cls._drop_db()
258
259
    def run(self, result=None):
260
        # Remember result for use in tearDown and tearDownClass
261
        self.current_result = result
262
        self.__class__.current_result = result
263
        super(DbTestCase, self).run(result=result)
264
265
266
class DbModelTestCase(DbTestCase):
267
    access_type = None
268
269
    @classmethod
270
    def setUpClass(cls):
271
        super(DbModelTestCase, cls).setUpClass()
272
        cls.db_type = cls.access_type.impl.model
273
274
    def _assert_fields_equal(self, a, b, exclude=None):
275
        exclude = exclude or []
276
        fields = {k: v for k, v in six.iteritems(self.db_type._fields) if k not in exclude}
277
278
        assert_funcs = {
279
            'mongoengine.fields.DictField': self.assertDictEqual,
280
            'mongoengine.fields.ListField': self.assertListEqual,
281
            'mongoengine.fields.SortedListField': self.assertListEqual
282
        }
283
284
        for k, v in six.iteritems(fields):
285
            assert_func = assert_funcs.get(str(v), self.assertEqual)
286
            assert_func(getattr(a, k, None), getattr(b, k, None))
287
288
    def _assert_values_equal(self, a, values=None):
289
        values = values or {}
290
291
        assert_funcs = {
292
            'dict': self.assertDictEqual,
293
            'list': self.assertListEqual
294
        }
295
296
        for k, v in six.iteritems(values):
297
            assert_func = assert_funcs.get(type(v).__name__, self.assertEqual)
298
            assert_func(getattr(a, k, None), v)
299
300
    def _assert_crud(self, instance, defaults=None, updates=None):
301
        # Assert instance is not already in the database.
302
        self.assertIsNone(getattr(instance, 'id', None))
303
304
        # Assert default values are assigned.
305
        self._assert_values_equal(instance, values=defaults)
306
307
        # Assert instance is created in the datbaase.
308
        saved = self.access_type.add_or_update(instance)
309
        self.assertIsNotNone(saved.id)
310
        self._assert_fields_equal(instance, saved, exclude=['id'])
311
        retrieved = self.access_type.get_by_id(saved.id)
312
        self._assert_fields_equal(saved, retrieved)
313
314
        # Assert instance is updated in the database.
315
        for k, v in six.iteritems(updates or {}):
316
            setattr(instance, k, v)
317
318
        updated = self.access_type.add_or_update(instance)
319
        self._assert_fields_equal(instance, updated)
320
321
        # Assert instance is deleted from the database.
322
        retrieved = self.access_type.get_by_id(instance.id)
323
        retrieved.delete()
324
        self.assertRaises(StackStormDBObjectNotFoundError,
325
                          self.access_type.get_by_id, instance.id)
326
327
    def _assert_unique_key_constraint(self, instance):
328
        # Assert instance is not already in the database.
329
        self.assertIsNone(getattr(instance, 'id', None))
330
331
        # Assert instance is created in the datbaase.
332
        saved = self.access_type.add_or_update(instance)
333
        self.assertIsNotNone(saved.id)
334
335
        # Assert exception is thrown if try to create same instance again.
336
        delattr(instance, 'id')
337
        self.assertRaises(StackStormDBObjectConflictError,
338
                          self.access_type.add_or_update,
339
                          instance)
340
341
342
class CleanDbTestCase(BaseDbTestCase):
343
    """
344
    Class which ensures database is re-created before running each test method.
345
346
    This means each test inside this class is self-sustained and starts with a clean (empty)
347
    database.
348
    """
349
350
    register_packs = False
351
    register_pack_configs = False
352
353
    def setUp(self):
354
        self._establish_connection_and_re_create_db()
355
356
        if self.register_packs:
357
            self._register_packs()
358
359
        if self.register_pack_configs:
360
            self._register_pack_configs()
361
362
363
class CleanFilesTestCase(TestCase):
364
    """
365
    Base test class which deletes specified files and directories on setUp and `tearDown.
366
    """
367
    to_delete_files = []
368
    to_delete_directories = []
369
370
    def setUp(self):
371
        super(CleanFilesTestCase, self).setUp()
372
        self._delete_files()
373
374
    def tearDown(self):
375
        super(CleanFilesTestCase, self).tearDown()
376
        self._delete_files()
377
378
    def _delete_files(self):
379
        for file_path in self.to_delete_files:
380
            if not os.path.isfile(file_path):
381
                continue
382
383
            try:
384
                os.remove(file_path)
385
            except Exception:
386
                pass
387
388
        for file_path in self.to_delete_directories:
389
            if not os.path.isdir(file_path):
390
                continue
391
392
            try:
393
                shutil.rmtree(file_path)
394
            except Exception:
395
                pass
396
397
398
class IntegrationTestCase(TestCase):
399
    """
400
    Base test class for integration tests to inherit from.
401
402
    It includes various utility functions and assert methods for working with processes.
403
    """
404
405
    # Set to True to print process stdout and stderr in tearDown after killing the processes
406
    # which are still alive
407
    print_stdout_stderr_on_teardown = False
408
409
    processes = {}
410
411
    def tearDown(self):
412
        super(IntegrationTestCase, self).tearDown()
413
414
        # Make sure we kill all the processes on teardown so they don't linger around if an
415
        # exception was thrown.
416
        for pid, process in self.processes.items():
417
418
            try:
419
                process.kill()
420
            except OSError:
421
                # Process already exited or similar
422
                pass
423
424
            if self.print_stdout_stderr_on_teardown:
425
                try:
426
                    stdout = process.stdout.read()
427
                except:
428
                    stdout = None
429
430
                try:
431
                    stderr = process.stderr.read()
432
                except:
433
                    stderr = None
434
435
                print('Process "%s"' % (process.pid))
436
                print('Stdout: %s' % (stdout))
437
                print('Stderr: %s' % (stderr))
438
439
    def add_process(self, process):
440
        """
441
        Add a process to the local data structure to make sure it will get killed and cleaned up on
442
        tearDown.
443
        """
444
        self.processes[process.pid] = process
445
446
    def remove_process(self, process):
447
        """
448
        Remove process from a local data structure.
449
        """
450
        if process.pid in self.processes:
451
            del self.processes[process.pid]
452
453
    def assertProcessIsRunning(self, process):
454
        """
455
        Assert that a long running process provided Process object as returned by subprocess.Popen
456
        has succesfuly started and is running.
457
        """
458
        if not process:
459
            raise ValueError('process is None')
460
461
        return_code = process.poll()
462
463
        if return_code is not None:
464
            if process.stdout:
465
                stdout = process.stdout.read()
466
            else:
467
                stdout = ''
468
469
            if process.stderr:
470
                stderr = process.stderr.read()
471
            else:
472
                stderr = ''
473
474
            msg = ('Process exited with code=%s.\nStdout:\n%s\n\nStderr:\n%s' %
475
                   (return_code, stdout, stderr))
476
            self.fail(msg)
477
478
    def assertProcessExited(self, proc):
479
        try:
480
            status = proc.status()
481
        except psutil.NoSuchProcess:
482
            status = 'exited'
483
484
        if status not in ['exited', 'zombie']:
485
            self.fail('Process with pid "%s" is still running' % (proc.pid))
486
487
488
class WorkflowTestCase(DbTestCase):
489
    """
490
    Base class for workflow service tests to inherit from.
491
    """
492
493
    def get_wf_fixture_meta_data(self, fixture_pack_path, wf_meta_file_name):
494
        wf_meta_file_path = fixture_pack_path + '/actions/' + wf_meta_file_name
495
        wf_meta_content = loader.load_meta_file(wf_meta_file_path)
496
        wf_name = wf_meta_content['pack'] + '.' + wf_meta_content['name']
497
498
        return {
499
            'file_name': wf_meta_file_name,
500
            'file_path': wf_meta_file_path,
501
            'content': wf_meta_content,
502
            'name': wf_name
503
        }
504
505
    def get_wf_def(self, test_pack_path, wf_meta):
506
        rel_wf_def_path = wf_meta['content']['entry_point']
507
        abs_wf_def_path = os.path.join(test_pack_path, 'actions', rel_wf_def_path)
508
509
        with open(abs_wf_def_path, 'r') as def_file:
510
            return def_file.read()
511
512
    def mock_st2_context(self, ac_ex_db, context=None):
513
        st2_ctx = {
514
            'st2': {
515
                'api_url': api_util.get_full_public_api_url(),
516
                'action_execution_id': str(ac_ex_db.id)
517
            }
518
        }
519
520
        if context:
521
            st2_ctx['parent'] = context
522
523
        return st2_ctx
524
525
    def prep_wf_ex(self, wf_ex_db):
526
        data = {
527
            'spec': wf_ex_db.spec,
528
            'graph': wf_ex_db.graph,
529
            'state': wf_ex_db.status,
530
            'flow': wf_ex_db.flow,
531
            'context': wf_ex_db.context,
532
            'input': wf_ex_db.input,
533
            'output': wf_ex_db.output,
534
            'errors': wf_ex_db.errors
535
        }
536
537
        conductor = conducting.WorkflowConductor.deserialize(data)
538
        conductor.set_workflow_state(wf_lib_states.RUNNING)
539
540
        for task in conductor.get_start_tasks():
541
            conductor.update_task_flow(task['id'], wf_lib_states.RUNNING)
542
543
        wf_ex_db.status = conductor.get_workflow_state()
544
        wf_ex_db.flow = conductor.flow.serialize()
545
        wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
546
547
        return wf_ex_db
548
549
    def get_task_ex(self, task_id):
550
        task_ex_dbs = wf_db_access.TaskExecution.query(task_id=task_id)
551
        self.assertGreater(len(task_ex_dbs), 0)
552
        return task_ex_dbs[0]
553
554
    def get_action_exs(self, task_ex_id):
555
        ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=task_ex_id)
556
        self.assertGreater(len(ac_ex_dbs), 0)
557
        return ac_ex_dbs
558
559
    def get_action_ex(self, task_ex_id):
560
        ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=task_ex_id)
561
        self.assertEqual(len(ac_ex_dbs), 1)
562
        return ac_ex_dbs[0]
563
564
    def run_workflow_step(self, wf_ex_db, task_id, ctx=None):
565
        spec_module = specs_loader.get_spec_module(wf_ex_db.spec['catalog'])
566
        wf_spec = spec_module.WorkflowSpec.deserialize(wf_ex_db.spec)
567
        task_spec = wf_spec.tasks.get_task(task_id)
568
        st2_ctx = {'execution_id': wf_ex_db.action_execution}
569
        task_ex_db = wf_svc.request_task_execution(wf_ex_db, task_id, task_spec, ctx or {}, st2_ctx)
570
        ac_ex_db = self.get_action_ex(str(task_ex_db.id))
571
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
572
        wf_svc.handle_action_execution_completion(ac_ex_db)
573
        task_ex_db = wf_db_access.TaskExecution.get_by_id(str(task_ex_db.id))
574
        self.assertEqual(task_ex_db.status, wf_lib_states.SUCCEEDED)
575
576
    def assert_task_not_started(self, task_id):
577
        task_ex_dbs = wf_db_access.TaskExecution.query(task_id=task_id)
578
        self.assertEqual(len(task_ex_dbs), 0)
579
580
    def assert_task_running(self, task_id):
581
        task_ex_db = self.get_task_ex(task_id)
582
        self.assertEqual(task_ex_db.task_id, task_id)
583
        self.assertEqual(task_ex_db.status, wf_lib_states.RUNNING)
584
585
    def assert_workflow_completed(self, wf_ex_id, state=None):
586
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_id)
587
        self.assertIn(wf_ex_db.status, wf_lib_states.COMPLETED_STATES)
588
589
        if state:
590
            self.assertIn(state, wf_lib_states.COMPLETED_STATES)
591
            self.assertEqual(wf_ex_db.status, state)
592
593
594
class FakeResponse(object):
595
596
    def __init__(self, text, status_code, reason):
597
        self.text = text
598
        self.status_code = status_code
599
        self.reason = reason
600
601
    def json(self):
602
        return json.loads(self.text)
603
604
    def raise_for_status(self):
605
        raise Exception(self.reason)
606
607
608
def get_fixtures_path():
609
    return os.path.join(os.path.dirname(__file__), 'fixtures')
610
611
612
def get_resources_path():
613
    return os.path.join(os.path.dirname(__file__), 'resources')
614
615
616
def blocking_eventlet_spawn(func, *args, **kwargs):
617
    func(*args, **kwargs)
618
    return mock.Mock()
619
620
621
# Utility function for mocking read_and_store_{stdout,stderr} functions
622
def make_mock_stream_readline(mock_stream, mock_data, stop_counter=1, sleep_delay=0):
623
    mock_stream.counter = 0
624
625
    def mock_stream_readline():
626
        if sleep_delay:
627
            eventlet.sleep(sleep_delay)
628
629
        if mock_stream.counter >= stop_counter:
630
            mock_stream.closed = True
631
            return
632
633
        line = mock_data[mock_stream.counter]
634
        mock_stream.counter += 1
635
        return line
636
637
    return mock_stream_readline
638