TestNVTICache.test_get_nvt_params()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 39
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 27
nop 2
dl 0
loc 39
rs 9.232
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=unused-argument, protected-access, invalid-name
21
22
""" Unit Test for ospd-openvas """
23
24
import logging
25
26
from unittest import TestCase
27
from unittest.mock import patch, Mock, PropertyMock
28
from pathlib import Path
29
30
from ospd_openvas.nvticache import NVTICache, NVTI_CACHE_NAME
31
32
from tests.helper import assert_called
33
34
35
@patch('ospd_openvas.nvticache.OpenvasDB')
36
class TestNVTICache(TestCase):
37
    @patch('ospd_openvas.db.MainDB')
38
    def setUp(self, MockMainDB):  # pylint: disable=arguments-differ
39
        self.db = MockMainDB()
40
        self.nvti = NVTICache(self.db)
41
        self.nvti._ctx = 'foo'
42
43
    def test_set_index(self, MockOpenvasDB):
44
        self.nvti._ctx = None
45
46
        MockOpenvasDB.find_database_by_pattern.return_value = ('foo', 22)
47
48
        ctx = self.nvti.ctx
49
50
        self.assertIsNotNone(ctx)
51
        self.assertEqual(ctx, 'foo')
52
        self.assertEqual(self.nvti.index, 22)
53
54
    def test_get_feed_version(self, MockOpenvasDB):
55
        MockOpenvasDB.get_single_item.return_value = '1234'
56
57
        resp = self.nvti.get_feed_version()
58
59
        self.assertEqual(resp, '1234')
60
        MockOpenvasDB.get_single_item.assert_called_with('foo', NVTI_CACHE_NAME)
61
62
    def test_get_feed_version_not_available(self, MockOpenvasDB):
63
        pmock = PropertyMock(return_value=123)
64
        type(self.db).max_database_index = pmock
65
        self.nvti._ctx = None
66
67
        MockOpenvasDB.find_database_by_pattern.return_value = (None, None)
68
69
        resp = self.nvti.get_feed_version()
70
71
        self.assertIsNone(resp)
72
        MockOpenvasDB.find_database_by_pattern.assert_called_with(
73
            NVTI_CACHE_NAME, 123
74
        )
75
76
    def test_get_oids(self, MockOpenvasDB):
77
        MockOpenvasDB.get_filenames_and_oids.return_value = ['oids']
78
79
        resp = self.nvti.get_oids()
80
81
        self.assertEqual(resp, ['oids'])
82
83
    def test_parse_metadata_tag_missing_value(self, MockOpenvasDB):
84
        logging.Logger.error = Mock()
85
86
        tags = 'tag1'
87
        ret = (
88
            NVTICache._parse_metadata_tags(  # pylint: disable=protected-access
89
                tags, '1.2.3'
90
            )
91
        )
92
93
        self.assertEqual(ret, {})
94
        assert_called(logging.Logger.error)
95
96
    def test_parse_metadata_tag(self, MockOpenvasDB):
97
        tags = 'tag1=value1'
98
        ret = (
99
            NVTICache._parse_metadata_tags(  # pylint: disable=protected-access
100
                tags, '1.2.3'
101
            )
102
        )
103
104
        self.assertEqual(ret, {'tag1': 'value1'})
105
106
    def test_parse_metadata_tags(self, MockOpenvasDB):
107
        tags = 'tag1=value1|foo=bar'
108
        ret = (
109
            NVTICache._parse_metadata_tags(  # pylint: disable=protected-access
110
                tags, '1.2.3'
111
            )
112
        )
113
114
        self.assertEqual(ret, {'tag1': 'value1', 'foo': 'bar'})
115
116
    def test_get_nvt_params(self, MockOpenvasDB):
117
        prefs1 = ['1|||dns-fuzz.timelimit|||entry|||default']
118
        prefs2 = ['1|||dns-fuzz.timelimit|||entry|||']
119
        prefs3 = ['1|||dns-fuzz.timelimit|||entry']
120
121
        out_dict1 = {
122
            '1': {
123
                'id': '1',
124
                'type': 'entry',
125
                'default': 'default',
126
                'name': 'dns-fuzz.timelimit',
127
                'description': 'Description',
128
            },
129
        }
130
131
        out_dict2 = {
132
            '1': {
133
                'id': '1',
134
                'type': 'entry',
135
                'default': '',
136
                'name': 'dns-fuzz.timelimit',
137
                'description': 'Description',
138
            },
139
        }
140
141
        MockOpenvasDB.get_list_item.return_value = prefs1
142
143
        resp = self.nvti.get_nvt_params('1.2.3.4')
144
        self.assertEqual(resp, out_dict1)
145
146
        MockOpenvasDB.get_list_item.return_value = prefs2
147
148
        resp = self.nvti.get_nvt_params('1.2.3.4')
149
        self.assertEqual(resp, out_dict2)
150
151
        MockOpenvasDB.get_list_item.return_value = prefs3
152
153
        resp = self.nvti.get_nvt_params('1.2.3.4')
154
        self.assertEqual(resp, out_dict2)
155
156
    def test_get_nvt_metadata(self, MockOpenvasDB):
157
        metadata = [
158
            'mantis_detect.nasl',
159
            '',
160
            '',
161
            'Settings/disable_cgi_scanning',
162
            '',
163
            'Services/www, 80',
164
            'find_service.nasl, http_version.nasl',
165
            'cvss_base_vector=AV:N/AC:L/Au:N/C:N/I:N'
166
            '/A:N|last_modification=1533906565'
167
            '|creation_date=1237458156'
168
            '|summary=Detects the ins'
169
            'talled version of\n  Mantis a free popular web-based '
170
            'bugtracking system.\n\n  This script sends HTTP GET r'
171
            'equest and try to get the version from the\n  respons'
172
            'e, and sets the result in KB.|qod_type=remote_banner',
173
            '',
174
            '',
175
            'URL:http://www.mantisbt.org/',
176
            '3',
177
            '10',
178
            'Product detection',
179
            'Mantis Detection',
180
        ]
181
182
        custom = {
183
            'category': '3',
184
            'creation_date': '1237458156',
185
            'cvss_base_vector': 'AV:N/AC:L/Au:N/C:N/I:N/A:N',
186
            'dependencies': 'find_service.nasl, http_version.nasl',
187
            'excluded_keys': 'Settings/disable_cgi_scanning',
188
            'family': 'Product detection',
189
            'filename': 'mantis_detect.nasl',
190
            'last_modification': ('1533906565'),
191
            'name': 'Mantis Detection',
192
            'qod_type': 'remote_banner',
193
            'refs': {'xref': ['URL:http://www.mantisbt.org/']},
194
            'required_ports': 'Services/www, 80',
195
            'summary': (
196
                'Detects the installed version of\n  Mantis a '
197
                'free popular web-based bugtracking system.\n'
198
                '\n  This script sends HTTP GET request and t'
199
                'ry to get the version from the\n  response, '
200
                'and sets the result in KB.'
201
            ),
202
            'vt_params': {
203
                '0': {
204
                    'id': '0',
205
                    'type': 'entry',
206
                    'name': 'timeout',
207
                    'description': 'Script Timeout',
208
                    'default': '10',
209
                },
210
                '1': {
211
                    'id': '1',
212
                    'type': 'entry',
213
                    'name': 'dns-fuzz.timelimit',
214
                    'description': 'Description',
215
                    'default': 'default',
216
                },
217
            },
218
        }
219
220
        prefs1 = ['1|||dns-fuzz.timelimit|||entry|||default']
221
222
        MockOpenvasDB.get_list_item.side_effect = [metadata, prefs1]
223
        resp = self.nvti.get_nvt_metadata('1.2.3.4')
224
        self.maxDiff = None
225
        self.assertEqual(resp, custom)
226
227
    def test_get_nvt_metadata_fail(self, MockOpenvasDB):
228
        MockOpenvasDB.get_list_item.return_value = []
229
230
        resp = self.nvti.get_nvt_metadata('1.2.3.4')
231
232
        self.assertIsNone(resp)
233
234
    def test_get_nvt_refs(self, MockOpenvasDB):
235
        refs = ['', '', 'URL:http://www.mantisbt.org/']
236
        out_dict = {
237
            'cve': [''],
238
            'bid': [''],
239
            'xref': ['URL:http://www.mantisbt.org/'],
240
        }
241
242
        MockOpenvasDB.get_list_item.return_value = refs
243
244
        resp = self.nvti.get_nvt_refs('1.2.3.4')
245
246
        self.assertEqual(resp, out_dict)
247
248
    def test_get_nvt_refs_fail(self, MockOpenvasDB):
249
        MockOpenvasDB.get_list_item.return_value = []
250
251
        resp = self.nvti.get_nvt_refs('1.2.3.4')
252
253
        self.assertIsNone(resp)
254
255
    def test_get_nvt_prefs(self, MockOpenvasDB):
256
        prefs = ['dns-fuzz.timelimit|||entry|||default']
257
258
        MockOpenvasDB.get_list_item.return_value = prefs
259
260
        resp = self.nvti.get_nvt_prefs('1.2.3.4')
261
262
        self.assertEqual(resp, prefs)
263
264
    def test_get_nvt_timeout(self, MockOpenvasDB):
265
        MockOpenvasDB.get_single_item.return_value = '300'
266
267
        resp = self.nvti.get_nvt_timeout('1.2.3.4')
268
269
        self.assertEqual(resp, '300')
270
271
    def test_get_nvt_tags(self, MockOpenvasDB):
272
        tag = (
273
            'last_modification=1533906565'
274
            '|creation_date=1517443741|cvss_bas'
275
            'e_vector=AV:N/AC:L/Au:N/C:P/I:P/A:P|solution_type=V'
276
            'endorFix|qod_type=package|affected=rubygems on Debi'
277
            'an Linux|solution_method=DebianAPTUpgrade'
278
        )
279
280
        out_dict = {
281
            'last_modification': '1533906565',
282
            'creation_date': '1517443741',
283
            'cvss_base_vector': 'AV:N/AC:L/Au:N/C:P/I:P/A:P',
284
            'solution_type': 'VendorFix',
285
            'qod_type': 'package',
286
            'affected': 'rubygems on Debian Linux',
287
            'solution_method': 'DebianAPTUpgrade',
288
        }
289
290
        MockOpenvasDB.get_single_item.return_value = tag
291
292
        resp = self.nvti.get_nvt_tags('1.2.3.4')
293
294
        self.assertEqual(out_dict, resp)
295
296
    def test_get_nvt_files_count(self, MockOpenvasDB):
297
        MockOpenvasDB.get_key_count.return_value = 20
298
299
        self.assertEqual(self.nvti.get_nvt_files_count(), 20)
300
        MockOpenvasDB.get_key_count.assert_called_with('foo', 'filename:*')
301
302
    def test_get_nvt_count(self, MockOpenvasDB):
303
        MockOpenvasDB.get_key_count.return_value = 20
304
305
        self.assertEqual(self.nvti.get_nvt_count(), 20)
306
        MockOpenvasDB.get_key_count.assert_called_with('foo', 'nvt:*')
307
308
    def test_force_reload(self, _MockOpenvasDB):
309
        self.nvti.force_reload()
310
311
        self.db.release_database.assert_called_with(self.nvti)
312
313
    def test_flush(self, _MockOpenvasDB):
314
        self.nvti._ctx = Mock()
315
316
        self.nvti.flush()
317
318
        self.nvti._ctx.flushdb.assert_called_with()
319
320
    def test_add_vt(self, MockOpenvasDB):
321
        MockOpenvasDB.add_single_list = Mock()
322
323
        self.nvti.add_vt_to_cache(
324
            '1234',
325
            [
326
                'a',
327
                'b',
328
                'c',
329
                'd',
330
                'e',
331
                'f',
332
                'g',
333
                'h',
334
                'i',
335
                'j',
336
                'k',
337
                'l',
338
                'm',
339
                'n',
340
                'o',
341
            ],
342
        )
343
        MockOpenvasDB.add_single_list.assert_called_with(
344
            'foo',
345
            '1234',
346
            [
347
                'a',
348
                'b',
349
                'c',
350
                'd',
351
                'e',
352
                'f',
353
                'g',
354
                'h',
355
                'i',
356
                'j',
357
                'k',
358
                'l',
359
                'm',
360
                'n',
361
                'o',
362
            ],
363
        )
364
365
    def test_get_file_checksum(self, MockOpenvasDB):
366
        MockOpenvasDB.get_single_item.return_value = '123456'
367
        path = Path("/tmp/foo.csv")
368
369
        resp = self.nvti.get_file_checksum(path)
370
371
        self.assertEqual(resp, '123456')
372
        MockOpenvasDB.get_single_item.assert_called_with(
373
            'foo', "sha256sums:/tmp/foo.csv"
374
        )
375