Test Failed
Pull Request — master (#4213)
by W
04:40
created

SystemScopeDatastoreFunctionTest.test_key_exists()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import unittest2
19
20
import st2tests
21
22
# XXX: actionsensor import depends on config being setup.
23
import st2tests.config as tests_config
24
tests_config.parse_args()
25
26
from functions import st2kv
27
from orchestra import exceptions as exc
28
29
from st2common.constants import keyvalue as kvp_const
30
from st2common.models.api import keyvalue as kvp_api
31
from st2common.models.db import auth as auth_db
32
from st2common.models.db import keyvalue as kvp_db
33
from st2common.persistence import keyvalue as kvp_db_access
34
from st2common.util import crypto
35
36
37
MOCK_ORCHESTRA_CTX = {'__vars': {'st2': {'user': 'stanley'}}}
38
MOCK_ORCHESTRA_CTX_NO_USER = {'__vars': {'st2': {}}}
39
40
41
class DatastoreFunctionTest(unittest2.TestCase):
42
43
    def test_missing_user_context(self):
44
        self.assertRaises(KeyError, st2kv.st2kv_, MOCK_ORCHESTRA_CTX_NO_USER, 'foo')
45
46
    def test_invalid_input(self):
47
        self.assertRaises(TypeError, st2kv.st2kv_, None, 123)
48
        self.assertRaises(TypeError, st2kv.st2kv_, {}, 123)
49
        self.assertRaises(TypeError, st2kv.st2kv_, {}, dict())
50
        self.assertRaises(TypeError, st2kv.st2kv_, {}, object())
51
        self.assertRaises(TypeError, st2kv.st2kv_, {}, [1, 2])
52
53
54
class UserScopeDatastoreFunctionTest(st2tests.DbTestCase):
55
56
    @classmethod
57
    def setUpClass(cls):
58
        super(UserScopeDatastoreFunctionTest, cls).setUpClass()
59
        user = auth_db.UserDB(name='stanley')
60
        user.save()
61
62
    def setUp(self):
63
        super(UserScopeDatastoreFunctionTest, self).setUp()
64
        scope = kvp_const.FULL_USER_SCOPE
65
66
        # Plain key
67
        key_id = 'stanley:foo'
68
        instance = kvp_db.KeyValuePairDB(name=key_id, value='bar', scope=scope)
69
        self.kvp = kvp_db_access.KeyValuePair.add_or_update(instance)
70
71
        # Secret key
72
        key_id = 'stanley:fu'
73
        value = crypto.symmetric_encrypt(kvp_api.KeyValuePairAPI.crypto_key, 'bar')
74
        instance = kvp_db.KeyValuePairDB(name=key_id, value=value, scope=scope, secret=True)
75
        self.secret_kvp = kvp_db_access.KeyValuePair.add_or_update(instance)
76
77
    def tearDown(self):
78
        if hasattr(self, 'kvp') and self.kvp:
79
            self.kvp.delete()
80
81
        if hasattr(self, 'secret_kvp') and self.secret_kvp:
82
            self.secret_kvp.delete()
83
84
        super(UserScopeDatastoreFunctionTest, self).tearDown()
85
86
    def test_key_exists(self):
87
        self.assertEqual(st2kv.st2kv_(MOCK_ORCHESTRA_CTX, 'foo'), 'bar')
88
89
    def test_key_does_not_exist(self):
90
        self.assertRaises(
91
            exc.ExpressionEvaluationException,
92
            st2kv.st2kv_,
93
            MOCK_ORCHESTRA_CTX,
94
            'foobar'
95
        )
96
97
    def test_key_decrypt(self):
98
        self.assertEqual(st2kv.st2kv_(MOCK_ORCHESTRA_CTX, 'fu', decrypt=True), 'bar')
99
100
101
class SystemScopeDatastoreFunctionTest(st2tests.DbTestCase):
102
103
    @classmethod
104
    def setUpClass(cls):
105
        super(SystemScopeDatastoreFunctionTest, cls).setUpClass()
106
        user = auth_db.UserDB(name='stanley')
107
        user.save()
108
109
    def setUp(self):
110
        super(SystemScopeDatastoreFunctionTest, self).setUp()
111
        scope = kvp_const.FULL_SYSTEM_SCOPE
112
113
        # Plain key
114
        key_id = 'foo'
115
        instance = kvp_db.KeyValuePairDB(name=key_id, value='bar', scope=scope)
116
        self.kvp = kvp_db_access.KeyValuePair.add_or_update(instance)
117
118
        # Secret key
119
        key_id = 'fu'
120
        value = crypto.symmetric_encrypt(kvp_api.KeyValuePairAPI.crypto_key, 'bar')
121
        instance = kvp_db.KeyValuePairDB(name=key_id, value=value, scope=scope, secret=True)
122
        self.secret_kvp = kvp_db_access.KeyValuePair.add_or_update(instance)
123
124
    def tearDown(self):
125
        if hasattr(self, 'kvp') and self.kvp:
126
            self.kvp.delete()
127
128
        if hasattr(self, 'secret_kvp') and self.secret_kvp:
129
            self.secret_kvp.delete()
130
131
        super(SystemScopeDatastoreFunctionTest, self).tearDown()
132
133
    def test_key_exists(self):
134
        self.assertEqual(st2kv.st2kv_(MOCK_ORCHESTRA_CTX, 'system.foo'), 'bar')
135
136
    def test_key_does_not_exist(self):
137
        self.assertRaises(
138
            exc.ExpressionEvaluationException,
139
            st2kv.st2kv_,
140
            MOCK_ORCHESTRA_CTX,
141
            'foo'
142
        )
143
144
    def test_key_decrypt(self):
145
        self.assertEqual(st2kv.st2kv_(MOCK_ORCHESTRA_CTX, 'system.fu', decrypt=True), 'bar')
146