Test Failed
Pull Request — master (#4213)
by W
04:40
created

DatastoreFunctionTest.test_st2kv_system_scope()   A

Complexity

Conditions 1

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 16
rs 9.6
1
# -*- coding: utf-8 -*-
2
3
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
4
# contributor license agreements.  See the NOTICE file distributed with
5
# this work for additional information regarding copyright ownership.
6
# The ASF licenses this file to You under the Apache License, Version 2.0
7
# (the "License"); you may not use this file except in compliance with
8
# the License.  You may obtain a copy of the License at
9
#
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17
from integration.orchestra import base
18
from st2client import models
19
from st2common.constants import action as ac_const
20
21
22
class DatastoreFunctionTest(base.TestWorkflowExecution):
23
    @classmethod
24
    def set_kvp(cls, name, value, scope='system', secret=False):
25
        kvp = models.KeyValuePair(
26
            id=name,
27
            name=name,
28
            value=value,
29
            scope=scope,
30
            secret=secret
31
        )
32
33
        cls.st2client.keys.update(kvp)
34
35
    @classmethod
36
    def del_kvp(cls, name, scope='system'):
37
        kvp = models.KeyValuePair(
38
            id=name,
39
            name=name,
40
            scope=scope
41
        )
42
43
        cls.st2client.keys.delete(kvp)
44
45
    def test_st2kv_system_scope(self):
46
        key = 'lakshmi'
47
        value = 'kanahansnasnasdlsajks'
48
49
        self.set_kvp(key, value)
50
        wf_name = 'examples.orchestra-st2kv'
51
        wf_input = {'key_name': 'system.%s' % key}
52
        execution = self._execute_workflow(wf_name, wf_input)
53
54
        output = self._wait_for_completion(execution)
55
56
        self.assertEqual(output.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
57
        self.assertIn('output', output.result)
58
        self.assertIn('value', output.result['output'])
59
        self.assertEqual(value, output.result['output']['value'])
60
        self.del_kvp(key)
61
62
    def test_st2kv_user_scope(self):
63
        key = 'winson'
64
        value = 'SoDiamondEng'
65
66
        self.set_kvp(key, value, 'user')
67
        wf_name = 'examples.orchestra-st2kv'
68
        wf_input = {'key_name': key}
69
        execution = self._execute_workflow(wf_name, wf_input)
70
71
        output = self._wait_for_completion(execution)
72
73
        self.assertEqual(output.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
74
        self.assertIn('output', output.result)
75
        self.assertIn('value', output.result['output'])
76
        self.assertEqual(value, output.result['output']['value'])
77
        # self.del_kvp(key)
78
79
    def test_st2kv_decrypt(self):
80
        key = 'kami'
81
        value = 'eggplant'
82
83
        self.set_kvp(key, value, secret=True)
84
        wf_name = 'examples.orchestra-st2kv'
85
        wf_input = {
86
            'key_name': 'system.%s' % key,
87
            'decrypt': True
88
        }
89
90
        execution = self._execute_workflow(wf_name, wf_input)
91
92
        output = self._wait_for_completion(execution)
93
94
        self.assertEqual(output.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
95
        self.assertIn('output', output.result)
96
        self.assertIn('value', output.result['output'])
97
        self.assertEqual(value, output.result['output']['value'])
98
        self.del_kvp(key)
99