Completed
Push — master ( 1ed431...35dbab )
by
unknown
17s queued 13s
created

DataPecklerTestCase.test_store_data_failed()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 1
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
import pickle
19
20
from pathlib import Path
21
from hashlib import sha256
22
from unittest import TestCase
23
from unittest.mock import patch
24
25
from ospd.errors import OspdCommandError
26
from ospd.datapickler import DataPickler
27
28
from .helper import assert_called
29
30
31
class DataPecklerTestCase(TestCase):
32
    def test_store_data(self):
33
        data = {'foo', 'bar'}
34
        filename = 'scan_info_1'
35
        pickled_data = pickle.dumps(data)
36
        tmp_hash = sha256()
37
        tmp_hash.update(pickled_data)
38
39
        data_pickler = DataPickler('/tmp')
40
        ret = data_pickler.store_data(filename, data)
41
42
        self.assertEqual(ret, tmp_hash.hexdigest())
43
44
        data_pickler.remove_file(filename)
45
46
    def test_store_data_failed(self):
47
        data = {'foo', 'bar'}
48
        filename = 'scan_info_1'
49
50
        data_pickler = DataPickler('/root')
51
52
        self.assertRaises(
53
            OspdCommandError, data_pickler.store_data, filename, data
54
        )
55
56
    def test_store_data_check_permission(self):
57
        OWNER_ONLY_RW_PERMISSION = '0o100600'  # pylint: disable=invalid-name
58
        data = {'foo', 'bar'}
59
        filename = 'scan_info_1'
60
61
        data_pickler = DataPickler('/tmp')
62
        data_pickler.store_data(filename, data)
63
64
        file_path = (
65
            Path(data_pickler._storage_path)  # pylint: disable=protected-access
66
            / filename
67
        )
68
        self.assertEqual(
69
            oct(file_path.stat().st_mode), OWNER_ONLY_RW_PERMISSION
70
        )
71
72
        data_pickler.remove_file(filename)
73
74
    def test_load_data(self):
75
76
        data_pickler = DataPickler('/tmp')
77
78
        data = {'foo', 'bar'}
79
        filename = 'scan_info_1'
80
        pickled_data = pickle.dumps(data)
81
82
        tmp_hash = sha256()
83
        tmp_hash.update(pickled_data)
84
        pickled_data_hash = tmp_hash.hexdigest()
85
86
        ret = data_pickler.store_data(filename, data)
87
        self.assertEqual(ret, pickled_data_hash)
88
89
        original_data = data_pickler.load_data(filename, pickled_data_hash)
90
        self.assertIsNotNone(original_data)
91
92
        self.assertIn('foo', original_data)
93
94
    @patch("ospd.datapickler.logger")
95
    def test_remove_file_failed(self, mock_logger):
96
        filename = 'inenxistent_file'
97
        data_pickler = DataPickler('/root')
98
        data_pickler.remove_file(filename)
99
100
        assert_called(mock_logger.error)
101
102
    @patch("ospd.datapickler.logger")
103
    def test_load_data_no_file(self, mock_logger):
104
        filename = 'scan_info_1'
105
        data_pickler = DataPickler('/tmp')
106
107
        data_loaded = data_pickler.load_data(filename, "1234")
108
        assert_called(mock_logger.error)
109
        self.assertIsNone(data_loaded)
110
111
        data_pickler.remove_file(filename)
112
113
    def test_load_data_corrupted(self):
114
115
        data_pickler = DataPickler('/tmp')
116
117
        data = {'foo', 'bar'}
118
        filename = 'scan_info_1'
119
        pickled_data = pickle.dumps(data)
120
121
        tmp_hash = sha256()
122
        tmp_hash.update(pickled_data)
123
        pickled_data_hash = tmp_hash.hexdigest()
124
125
        ret = data_pickler.store_data(filename, data)
126
        self.assertEqual(ret, pickled_data_hash)
127
128
        # courrupt data
129
        file_to_corrupt = (
130
            Path(data_pickler._storage_path)  # pylint: disable=protected-access
131
            / filename
132
        )
133
        with file_to_corrupt.open('ab') as f:
134
            f.write(b'bar2')
135
136
        original_data = data_pickler.load_data(filename, pickled_data_hash)
137
        self.assertIsNone(original_data)
138
139
        data_pickler.remove_file(filename)
140