Completed
Push — master ( ee6aac...b90bd1 )
by Björn
28s queued 12s
created

TestNVTICache.test_get_file_checksum()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 2
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=unused-argument, protected-access, invalid-name
21
22
""" Unit Test for ospd-openvas """
23
24
import logging
25
26
from unittest import TestCase
27
from unittest.mock import patch, Mock, PropertyMock
28
from pathlib import Path
29
30
from ospd_openvas.errors import OspdOpenvasError
31
from ospd_openvas.nvticache import NVTICache, NVTI_CACHE_NAME
32
33
from tests.helper import assert_called
34
35
36
@patch('ospd_openvas.nvticache.OpenvasDB')
37
class TestNVTICache(TestCase):
38
    @patch('ospd_openvas.db.MainDB')
39
    def setUp(self, MockMainDB):  # pylint: disable=arguments-differ
40
        self.db = MockMainDB()
41
        self.nvti = NVTICache(self.db)
42
        self.nvti._ctx = 'foo'
43
44
    def test_set_index(self, MockOpenvasDB):
45
        self.nvti._ctx = None
46
47
        MockOpenvasDB.find_database_by_pattern.return_value = ('foo', 22)
48
49
        ctx = self.nvti.ctx
50
51
        self.assertIsNotNone(ctx)
52
        self.assertEqual(ctx, 'foo')
53
        self.assertEqual(self.nvti.index, 22)
54
55
    def test_get_feed_version(self, MockOpenvasDB):
56
        MockOpenvasDB.get_single_item.return_value = '1234'
57
58
        resp = self.nvti.get_feed_version()
59
60
        self.assertEqual(resp, '1234')
61
        MockOpenvasDB.get_single_item.assert_called_with('foo', NVTI_CACHE_NAME)
62
63
    def test_get_feed_version_not_available(self, MockOpenvasDB):
64
        pmock = PropertyMock(return_value=123)
65
        type(self.db).max_database_index = pmock
66
        self.nvti._ctx = None
67
68
        MockOpenvasDB.find_database_by_pattern.return_value = (None, None)
69
70
        resp = self.nvti.get_feed_version()
71
72
        self.assertIsNone(resp)
73
        MockOpenvasDB.find_database_by_pattern.assert_called_with(
74
            NVTI_CACHE_NAME, 123
75
        )
76
77
    def test_get_oids(self, MockOpenvasDB):
78
        MockOpenvasDB.get_filenames_and_oids.return_value = ['oids']
79
80
        resp = self.nvti.get_oids()
81
82
        self.assertEqual(resp, ['oids'])
83
84
    def test_parse_metadata_tag_missing_value(self, MockOpenvasDB):
85
        logging.Logger.error = Mock()
86
87
        tags = 'tag1'
88
        ret = (
89
            NVTICache._parse_metadata_tags(  # pylint: disable=protected-access
90
                tags, '1.2.3'
91
            )
92
        )
93
94
        self.assertEqual(ret, {})
95
        assert_called(logging.Logger.error)
96
97
    def test_parse_metadata_tag(self, MockOpenvasDB):
98
        tags = 'tag1=value1'
99
        ret = (
100
            NVTICache._parse_metadata_tags(  # pylint: disable=protected-access
101
                tags, '1.2.3'
102
            )
103
        )
104
105
        self.assertEqual(ret, {'tag1': 'value1'})
106
107
    def test_parse_metadata_tags(self, MockOpenvasDB):
108
        tags = 'tag1=value1|foo=bar'
109
        ret = (
110
            NVTICache._parse_metadata_tags(  # pylint: disable=protected-access
111
                tags, '1.2.3'
112
            )
113
        )
114
115
        self.assertEqual(ret, {'tag1': 'value1', 'foo': 'bar'})
116
117
    def test_get_nvt_params(self, MockOpenvasDB):
118
        prefs1 = ['1|||dns-fuzz.timelimit|||entry|||default']
119
        prefs2 = ['1|||dns-fuzz.timelimit|||entry|||']
120
        prefs3 = ['1|||dns-fuzz.timelimit|||entry']
121
122
        out_dict1 = {
123
            '1': {
124
                'id': '1',
125
                'type': 'entry',
126
                'default': 'default',
127
                'name': 'dns-fuzz.timelimit',
128
                'description': 'Description',
129
            },
130
        }
131
132
        out_dict2 = {
133
            '1': {
134
                'id': '1',
135
                'type': 'entry',
136
                'default': '',
137
                'name': 'dns-fuzz.timelimit',
138
                'description': 'Description',
139
            },
140
        }
141
142
        MockOpenvasDB.get_list_item.return_value = prefs1
143
144
        resp = self.nvti.get_nvt_params('1.2.3.4')
145
        self.assertEqual(resp, out_dict1)
146
147
        MockOpenvasDB.get_list_item.return_value = prefs2
148
149
        resp = self.nvti.get_nvt_params('1.2.3.4')
150
        self.assertEqual(resp, out_dict2)
151
152
        MockOpenvasDB.get_list_item.return_value = prefs3
153
154
        resp = self.nvti.get_nvt_params('1.2.3.4')
155
        self.assertEqual(resp, out_dict2)
156
157
    def test_get_nvt_metadata(self, MockOpenvasDB):
158
        metadata = [
159
            'mantis_detect.nasl',
160
            '',
161
            '',
162
            'Settings/disable_cgi_scanning',
163
            '',
164
            'Services/www, 80',
165
            'find_service.nasl, http_version.nasl',
166
            'cvss_base_vector=AV:N/AC:L/Au:N/C:N/I:N'
167
            '/A:N|last_modification=1533906565'
168
            '|creation_date=1237458156'
169
            '|summary=Detects the ins'
170
            'talled version of\n  Mantis a free popular web-based '
171
            'bugtracking system.\n\n  This script sends HTTP GET r'
172
            'equest and try to get the version from the\n  respons'
173
            'e, and sets the result in KB.|qod_type=remote_banner',
174
            '',
175
            '',
176
            'URL:http://www.mantisbt.org/',
177
            '3',
178
            '10',
179
            'Product detection',
180
            'Mantis Detection',
181
        ]
182
183
        custom = {
184
            'category': '3',
185
            'creation_date': '1237458156',
186
            'cvss_base_vector': 'AV:N/AC:L/Au:N/C:N/I:N/A:N',
187
            'dependencies': 'find_service.nasl, http_version.nasl',
188
            'excluded_keys': 'Settings/disable_cgi_scanning',
189
            'family': 'Product detection',
190
            'filename': 'mantis_detect.nasl',
191
            'last_modification': ('1533906565'),
192
            'name': 'Mantis Detection',
193
            'qod_type': 'remote_banner',
194
            'refs': {'xref': ['URL:http://www.mantisbt.org/']},
195
            'required_ports': 'Services/www, 80',
196
            'summary': (
197
                'Detects the installed version of\n  Mantis a '
198
                'free popular web-based bugtracking system.\n'
199
                '\n  This script sends HTTP GET request and t'
200
                'ry to get the version from the\n  response, '
201
                'and sets the result in KB.'
202
            ),
203
            'vt_params': {
204
                '0': {
205
                    'id': '0',
206
                    'type': 'entry',
207
                    'name': 'timeout',
208
                    'description': 'Script Timeout',
209
                    'default': '10',
210
                },
211
                '1': {
212
                    'id': '1',
213
                    'type': 'entry',
214
                    'name': 'dns-fuzz.timelimit',
215
                    'description': 'Description',
216
                    'default': 'default',
217
                },
218
            },
219
        }
220
221
        prefs1 = ['1|||dns-fuzz.timelimit|||entry|||default']
222
223
        MockOpenvasDB.get_list_item.side_effect = [metadata, prefs1]
224
        resp = self.nvti.get_nvt_metadata('1.2.3.4')
225
        self.maxDiff = None
226
        self.assertEqual(resp, custom)
227
228
    def test_get_nvt_metadata_fail(self, MockOpenvasDB):
229
        MockOpenvasDB.get_list_item.return_value = []
230
231
        resp = self.nvti.get_nvt_metadata('1.2.3.4')
232
233
        self.assertIsNone(resp)
234
235
    def test_get_nvt_refs(self, MockOpenvasDB):
236
        refs = ['', '', 'URL:http://www.mantisbt.org/']
237
        out_dict = {
238
            'cve': [''],
239
            'bid': [''],
240
            'xref': ['URL:http://www.mantisbt.org/'],
241
        }
242
243
        MockOpenvasDB.get_list_item.return_value = refs
244
245
        resp = self.nvti.get_nvt_refs('1.2.3.4')
246
247
        self.assertEqual(resp, out_dict)
248
249
    def test_get_nvt_refs_fail(self, MockOpenvasDB):
250
        MockOpenvasDB.get_list_item.return_value = []
251
252
        resp = self.nvti.get_nvt_refs('1.2.3.4')
253
254
        self.assertIsNone(resp)
255
256
    def test_get_nvt_prefs(self, MockOpenvasDB):
257
        prefs = ['dns-fuzz.timelimit|||entry|||default']
258
259
        MockOpenvasDB.get_list_item.return_value = prefs
260
261
        resp = self.nvti.get_nvt_prefs('1.2.3.4')
262
263
        self.assertEqual(resp, prefs)
264
265
    def test_get_nvt_timeout(self, MockOpenvasDB):
266
        MockOpenvasDB.get_single_item.return_value = '300'
267
268
        resp = self.nvti.get_nvt_timeout('1.2.3.4')
269
270
        self.assertEqual(resp, '300')
271
272
    def test_get_nvt_tags(self, MockOpenvasDB):
273
        tag = (
274
            'last_modification=1533906565'
275
            '|creation_date=1517443741|cvss_bas'
276
            'e_vector=AV:N/AC:L/Au:N/C:P/I:P/A:P|solution_type=V'
277
            'endorFix|qod_type=package|affected=rubygems on Debi'
278
            'an Linux|solution_method=DebianAPTUpgrade'
279
        )
280
281
        out_dict = {
282
            'last_modification': '1533906565',
283
            'creation_date': '1517443741',
284
            'cvss_base_vector': 'AV:N/AC:L/Au:N/C:P/I:P/A:P',
285
            'solution_type': 'VendorFix',
286
            'qod_type': 'package',
287
            'affected': 'rubygems on Debian Linux',
288
            'solution_method': 'DebianAPTUpgrade',
289
        }
290
291
        MockOpenvasDB.get_single_item.return_value = tag
292
293
        resp = self.nvti.get_nvt_tags('1.2.3.4')
294
295
        self.assertEqual(out_dict, resp)
296
297
    def test_get_nvt_files_count(self, MockOpenvasDB):
298
        MockOpenvasDB.get_key_count.return_value = 20
299
300
        self.assertEqual(self.nvti.get_nvt_files_count(), 20)
301
        MockOpenvasDB.get_key_count.assert_called_with('foo', 'filename:*')
302
303
    def test_get_nvt_count(self, MockOpenvasDB):
304
        MockOpenvasDB.get_key_count.return_value = 20
305
306
        self.assertEqual(self.nvti.get_nvt_count(), 20)
307
        MockOpenvasDB.get_key_count.assert_called_with('foo', 'nvt:*')
308
309
    def test_force_reload(self, _MockOpenvasDB):
310
        self.nvti.force_reload()
311
312
        self.db.release_database.assert_called_with(self.nvti)
313
314
    def test_flush(self, _MockOpenvasDB):
315
        self.nvti._ctx = Mock()
316
317
        self.nvti.flush()
318
319
        self.nvti._ctx.flushdb.assert_called_with()
320
321
    def test_add_vt(self, MockOpenvasDB):
322
        MockOpenvasDB.add_single_list = Mock()
323
324
        self.nvti.add_vt_to_cache(
325
            '1234',
326
            [
327
                'a',
328
                'b',
329
                'c',
330
                'd',
331
                'e',
332
                'f',
333
                'g',
334
                'h',
335
                'i',
336
                'j',
337
                'k',
338
                'l',
339
                'm',
340
                'n',
341
                'o',
342
            ],
343
        )
344
        MockOpenvasDB.add_single_list.assert_called_with(
345
            'foo',
346
            '1234',
347
            [
348
                'a',
349
                'b',
350
                'c',
351
                'd',
352
                'e',
353
                'f',
354
                'g',
355
                'h',
356
                'i',
357
                'j',
358
                'k',
359
                'l',
360
                'm',
361
                'n',
362
                'o',
363
            ],
364
        )
365
366
    def test_get_file_checksum(self, MockOpenvasDB):
367
        MockOpenvasDB.get_single_item.return_value = '123456'
368
        path = Path("/tmp/foo.csv")
369
370
        resp = self.nvti.get_file_checksum(path)
371
372
        self.assertEqual(resp, '123456')
373
        MockOpenvasDB.get_single_item.assert_called_with(
374
            'foo', "sha256sums:/tmp/foo.csv"
375
        )
376