Passed
Pull Request — master (#364)
by Juan José
01:39
created

NotusTestCase.test_metadata_path()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 10
nop 2
dl 0
loc 12
rs 9.9
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
import logging
20
import unittest
21
22
from csv import DictReader
23
from pathlib import Path, PurePath
24
from collections import OrderedDict
25
from unittest.mock import patch, MagicMock
26
27
from ospd_openvas.notus.metadata import (
28
    NotusMetadataHandler,
29
    EXPECTED_FIELD_NAMES_LIST,
30
    METADATA_DIRECTORY_NAME,
31
)
32
from ospd_openvas.errors import OspdOpenvasError
33
34
35
class NotusTestCase(unittest.TestCase):
36
    @patch('ospd_openvas.nvticache.NVTICache')
37
    def setUp(self, MockNvti):
38
        self.nvti = MockNvti()
39
40
    @patch('ospd_openvas.notus.metadata.Openvas')
41
    def test_set_openvas_settings(self, MockOpenvas):
42
        openvas = MockOpenvas()
43
        openvas.get_settings.return_value = {'nasl_no_signature_check': 0}
44
45
        notus = NotusMetadataHandler()
46
        no_signature_check = notus.openvas_setting.get(
47
            "nasl_no_signature_check"
48
        )
49
50
        self.assertEqual(no_signature_check, 0)
51
52
    @patch('ospd_openvas.notus.metadata.Openvas')
53
    def test_metadata_path(self, MockOpenvas):
54
        openvas = MockOpenvas()
55
        openvas.get_settings.return_value = {'plugins_folder': './tests/notus'}
56
        notus = NotusMetadataHandler()
57
58
        self.assertIsNone(notus._metadata_path)
59
        self.assertEqual(
60
            notus.metadata_path, f'./tests/notus/{METADATA_DIRECTORY_NAME}/'
61
        )
62
        self.assertEqual(
63
            notus._metadata_path, f'./tests/notus/{METADATA_DIRECTORY_NAME}/'
64
        )
65
66
    def test_is_checksum_correct_check_disable(self):
67
        notus = NotusMetadataHandler()
68
        notus._openvas_settings_dict = {'nasl_no_signature_check': 1}
69
70
        self.assertTrue(notus.is_checksum_correct(Path("foo")))
71
72
    def test_is_checksum_correct_enabled_false(self):
73
        notus = NotusMetadataHandler(nvti=self.nvti)
74
        notus.nvti.get_file_checksum.return_value = "abc123"
75
        notus._openvas_settings_dict = {'nasl_no_signature_check': 0}
76
77
        self.assertFalse(
78
            notus.is_checksum_correct(Path("./tests/notus/example.csv"))
79
        )
80
81
    def test_is_checksum_correct_enabled_true(self):
82
        notus = NotusMetadataHandler(nvti=self.nvti)
83
        notus.nvti.get_file_checksum.return_value = (
84
            "2f561b9be5d1a1194f49cd5a6a024dee15a0c0bc7d94287266d0e6358e737f4e"
85
        )
86
        notus._openvas_settings_dict = {'nasl_no_signature_check': 0}
87
88
        self.assertTrue(
89
            notus.is_checksum_correct(Path("./tests/notus/example.csv"))
90
        )
91
92
    def test_check_advisory_dict(self):
93
        advisory_dict = OrderedDict(
94
            [
95
                ('OID', '1.3.6.1.4.1.25623.1.1.2.2020.1234'),
96
                (
97
                    'TITLE',
98
                    'VendorOS: Security Advisory for git (VendorOS-2020-1234)',
99
                ),
100
                ('CREATION_DATE', '1600269468'),
101
                ('LAST_MODIFICATION', '1601380531'),
102
                ('SOURCE_PKGS', "['git']"),
103
                ('ADVISORY_ID', 'VendorOS-2020-1234'),
104
                ('CVSS_BASE_VECTOR', 'AV:N/AC:L/Au:N/C:C/I:C/A:C'),
105
                ('CVSS_BASE', '10.0'),
106
                ('ADVISORY_XREF', 'https://example.com'),
107
                ('DESCRIPTION', 'The remote host is missing an update.'),
108
                ('INSIGHT', 'buffer overflow'),
109
                ('AFFECTED', "'p1' package(s) on VendorOS V2.0SP1"),
110
                ('CVE_LIST', "['CVE-2020-1234']"),
111
                (
112
                    'BINARY_PACKAGES_FOR_RELEASES',
113
                    "{'VendorOS V2.0SP1': ['p1-1.1']}",
114
                ),
115
                ('XREFS', '[]'),
116
            ]
117
        )
118
119
        notus = NotusMetadataHandler()
120
        self.assertTrue(notus._check_advisory_dict(advisory_dict))
121
122
    def test_check_advisory_dict_no_value(self):
123
        advisory_dict = OrderedDict(
124
            [
125
                ('OID', '1.3.6.1.4.1.25623.1.1.2.2020.1234'),
126
                (
127
                    'TITLE',
128
                    'VendorOS: Security Advisory for git (VendorOS-2020-1234)',
129
                ),
130
                ('CREATION_DATE', None),
131
                ('LAST_MODIFICATION', '1601380531'),
132
                ('SOURCE_PKGS', "['git']"),
133
                ('ADVISORY_ID', 'VendorOS-2020-1234'),
134
                ('CVSS_BASE_VECTOR', 'AV:N/AC:L/Au:N/C:C/I:C/A:C'),
135
                ('CVSS_BASE', '10.0'),
136
                ('ADVISORY_XREF', 'https://example.com'),
137
                ('DESCRIPTION', 'The remote host is missing an update.'),
138
                ('INSIGHT', 'buffer overflow'),
139
                ('AFFECTED', "'p1' package(s) on VendorOS V2.0SP1"),
140
                ('CVE_LIST', "['CVE-2020-1234']"),
141
                (
142
                    'BINARY_PACKAGES_FOR_RELEASES',
143
                    "{'VendorOS V2.0SP1': ['p1-1.1']}",
144
                ),
145
                ('XREFS', '[]'),
146
            ]
147
        )
148
149
        notus = NotusMetadataHandler()
150
        self.assertFalse(notus._check_advisory_dict(advisory_dict))
151
152
    def test_check_advisory_dict_no_package(self):
153
        advisory_dict = OrderedDict(
154
            [
155
                ('OID', '1.3.6.1.4.1.25623.1.1.2.2020.1234'),
156
                (
157
                    'TITLE',
158
                    'VendorOS: Security Advisory for git (VendorOS-2020-1234)',
159
                ),
160
                ('CREATION_DATE', '1600269468'),
161
                ('LAST_MODIFICATION', '1601380531'),
162
                ('SOURCE_PKGS', "[]"),
163
                ('ADVISORY_ID', 'VendorOS-2020-1234'),
164
                ('CVSS_BASE_VECTOR', 'AV:N/AC:L/Au:N/C:C/I:C/A:C'),
165
                ('CVSS_BASE', '10.0'),
166
                ('ADVISORY_XREF', 'https://example.com'),
167
                ('DESCRIPTION', 'The remote host is missing an update.'),
168
                ('INSIGHT', 'buffer overflow'),
169
                ('AFFECTED', "'p1' package(s) on VendorOS V2.0SP1"),
170
                ('CVE_LIST', "['CVE-2020-1234']"),
171
                (
172
                    'BINARY_PACKAGES_FOR_RELEASES',
173
                    "{'VendorOS V2.0SP1': ['p1-1.1']}",
174
                ),
175
                ('XREFS', '[]'),
176
            ]
177
        )
178
179
        notus = NotusMetadataHandler()
180
        self.assertFalse(notus._check_advisory_dict(advisory_dict))
181
182
    def test_check_advisory_dict_valerr(self):
183
        advisory_dict = OrderedDict(
184
            [
185
                ('OID', '1.3.6.1.4.1.25623.1.1.2.2020.1234'),
186
                (
187
                    'TITLE',
188
                    'VendorOS: Security Advisory for git (VendorOS-2020-1234)',
189
                ),
190
                ('CREATION_DATE', '1600269468'),
191
                ('LAST_MODIFICATION', '1601380531'),
192
                ('SOURCE_PKGS', "a"),
193
                ('ADVISORY_ID', 'VendorOS-2020-1234'),
194
                ('CVSS_BASE_VECTOR', 'AV:N/AC:L/Au:N/C:C/I:C/A:C'),
195
                ('CVSS_BASE', '10.0'),
196
                ('ADVISORY_XREF', 'https://example.com'),
197
                ('DESCRIPTION', 'The remote host is missing an update.'),
198
                ('INSIGHT', 'buffer overflow'),
199
                ('AFFECTED', "'p1' package(s) on VendorOS V2.0SP1"),
200
                ('CVE_LIST', "['CVE-2020-1234']"),
201
                (
202
                    'BINARY_PACKAGES_FOR_RELEASES',
203
                    "{'VendorOS V2.0SP1': ['p1-1.1']}",
204
                ),
205
                ('XREFS', '[]'),
206
            ]
207
        )
208
209
        notus = NotusMetadataHandler()
210
        self.assertFalse(notus._check_advisory_dict(advisory_dict))
211
212
    def test_format_xrefs(self):
213
        notus = NotusMetadataHandler()
214
        ret = notus._format_xrefs(
215
            "https://example.com", ["www.foo.net", "www.bar.net"]
216
        )
217
218
        self.assertEqual(
219
            ret, "URL:https://example.com, URL:www.foo.net, URL:www.bar.net"
220
        )
221
222
    def test_check_field_names_lsc(self):
223
        notus = NotusMetadataHandler()
224
        field_names_list = [
225
            "OID",
226
            "TITLE",
227
            "CREATION_DATE",
228
            "LAST_MODIFICATION",
229
            "SOURCE_PKGS",
230
            "ADVISORY_ID",
231
            "SEVERITY_ORIGIN",
232
            "SEVERITY_DATE",
233
            "SEVERITY_VECTOR",
234
            "ADVISORY_XREF",
235
            "DESCRIPTION",
236
            "INSIGHT",
237
            "AFFECTED",
238
            "CVE_LIST",
239
            "BINARY_PACKAGES_FOR_RELEASES",
240
            "XREFS",
241
        ]
242
        self.assertTrue(notus._check_field_names_lsc(field_names_list))
243
244
    def test_check_field_names_lsc_unordered(self):
245
        notus = NotusMetadataHandler()
246
        field_names_list = [
247
            "TITLE",
248
            "OID",
249
            "CREATION_DATE",
250
            "LAST_MODIFICATION",
251
            "SOURCE_PKGS",
252
            "ADVISORY_ID",
253
            "SEVERITY_ORIGIN",
254
            "SEVERITY_DATE",
255
            "SEVERITY_VECTOR",
256
            "ADVISORY_XREF",
257
            "DESCRIPTION",
258
            "INSIGHT",
259
            "AFFECTED",
260
            "CVE_LIST",
261
            "BINARY_PACKAGES_FOR_RELEASES",
262
            "XREFS",
263
        ]
264
265
        self.assertFalse(notus._check_field_names_lsc(field_names_list))
266
267
    def test_check_field_names_lsc_missing(self):
268
        notus = NotusMetadataHandler()
269
        field_names_list = [
270
            "OID",
271
            "CREATION_DATE",
272
            "LAST_MODIFICATION",
273
            "SOURCE_PKGS",
274
            "ADVISORY_ID",
275
            "CVSS_BASE_VECTOR",
276
            "CVSS_BASE",
277
            "ADVISORY_XREF",
278
            "DESCRIPTION",
279
            "INSIGHT",
280
            "AFFECTED",
281
            "CVE_LIST",
282
            "BINARY_PACKAGES_FOR_RELEASES",
283
            "XREFS",
284
        ]
285
286
        self.assertFalse(notus._check_field_names_lsc(field_names_list))
287
288
    def test_get_csv_filepath(self):
289
        path = Path("./tests/notus/example.csv").resolve()
290
291
        notus = NotusMetadataHandler(metadata_path="./tests/notus/")
292
        ret = notus._get_csv_filepaths()
293
294
        self.assertEqual(ret, [path])
295
296
    @patch('ospd_openvas.notus.metadata.Openvas')
297
    def test_update_metadata_warning(self, MockOpenvas):
298
        notus = NotusMetadataHandler()
299
        logging.Logger.warning = MagicMock()
300
        path = Path("./tests/notus/example.csv").resolve()
301
        openvas = MockOpenvas()
302
        openvas.get_settings.return_value = {'table_driven_lsc': 1}
303
304
        notus._get_csv_filepaths = MagicMock(return_value=[path])
305
        notus.is_checksum_correct = MagicMock(return_value=False)
306
307
        notus.update_metadata()
308
        logging.Logger.warning.assert_called_with(
309
            f'Checksum for %s failed', path
310
        )
311
312 View Code Duplication
    @patch('ospd_openvas.notus.metadata.Openvas')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
313
    def test_update_metadata_field_name_failed(self, MockOpenvas):
314
        notus = NotusMetadataHandler(metadata_path="./tests/notus")
315
        logging.Logger.warning = MagicMock()
316
        path = Path("./tests/notus/example.csv").resolve()
317
        openvas = MockOpenvas()
318
        openvas.get_settings.return_value = {'table_driven_lsc': 1}
319
320
        notus._get_csv_filepaths = MagicMock(return_value=[path])
321
        notus.is_checksum_correct = MagicMock(return_value=True)
322
        notus._check_field_names_lsc = MagicMock(return_value=False)
323
324
        notus.update_metadata()
325
326
        logging.Logger.warning.assert_called_with(
327
            f'Field names check for %s failed', path
328
        )
329
330 View Code Duplication
    @patch('ospd_openvas.notus.metadata.Openvas')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
331
    def test_update_metadata_failed(self, MockOpenvas):
332
        notus = NotusMetadataHandler(metadata_path="./tests/notus")
333
        logging.Logger.warning = MagicMock()
334
        path = Path("./tests/notus/example.csv").resolve()
335
        openvas = MockOpenvas()
336
        openvas.get_settings.return_value = {'table_driven_lsc': 1}
337
338
        notus._get_csv_filepaths = MagicMock(return_value=[path])
339
        notus.is_checksum_correct = MagicMock(return_value=True)
340
        notus._check_field_names_lsc = MagicMock(return_value=True)
341
        notus.upload_lsc_from_csv_reader = MagicMock(return_value=False)
342
343
        notus.update_metadata()
344
345
        logging.Logger.warning.assert_called_with(
346
            "Some advaisory was not loaded from %s", path.name
347
        )
348
349
    def test_update_metadata_disabled(self):
350
        notus = NotusMetadataHandler(metadata_path="./tests/notus")
351
        ret = notus.update_metadata()
352
        self.assertIsNone(ret)
353
354
    @patch('ospd_openvas.notus.metadata.Openvas')
355
    def test_update_metadata_success(self, MockOpenvas):
356
        notus = NotusMetadataHandler(metadata_path="./tests/notus")
357
        logging.Logger.warning = MagicMock()
358
        path = Path("./tests/notus/example.csv").resolve()
359
        purepath = PurePath(path).name
360
        openvas = MockOpenvas()
361
        openvas.get_settings.return_value = {'table_driven_lsc': 1}
362
363
        notus._get_csv_filepaths = MagicMock(return_value=[path])
364
        notus.is_checksum_correct = MagicMock(return_value=True)
365
        notus._check_field_names_lsc = MagicMock(return_value=True)
366
        notus.upload_lsc_from_csv_reader = MagicMock(return_value=True)
367
368
        notus.update_metadata()
369
370
        logging.Logger.warning.assert_not_called()
371
372 View Code Duplication
    def test_upload_lsc_from_csv_reader_failed(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
373
        general_metadata_dict = {
374
            'VULDETECT': 'Checks if a vulnerable package version is present on the target host.',
375
            'SOLUTION': 'Please install the updated package(s).',
376
            'SOLUTION_TYPE': 'VendorFix',
377
            'QOD_TYPE': 'package',
378
        }
379
380
        notus = NotusMetadataHandler(
381
            nvti=self.nvti, metadata_path="./tests/notus"
382
        )
383
        notus.nvti.add_vt_to_cache.side_effect = OspdOpenvasError
384
        logging.Logger.debug = MagicMock()
385
        path = Path("./tests/notus/example.csv").resolve()
386
        purepath = PurePath(path).name
387
        with path.open("r") as openfile:
388
            for line_string in openfile:
389
                if line_string.startswith("{"):
390
                    break
391
            reader = DictReader(openfile)
392
393
            ret = notus.upload_lsc_from_csv_reader(
394
                purepath, general_metadata_dict, reader
395
            )
396
397
        self.assertFalse(ret)
398
        logging.Logger.debug.assert_called_with(
399
            "Loaded %d/%d advisories from %s", 0, 1, purepath
400
        )
401
402 View Code Duplication
    def test_upload_lsc_from_csv_reader_sucess(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
403
        general_metadata_dict = {
404
            'VULDETECT': 'Checks if a vulnerable package version is present on the target host.',
405
            'SOLUTION': 'Please install the updated package(s).',
406
            'SOLUTION_TYPE': 'VendorFix',
407
            'QOD_TYPE': 'package',
408
        }
409
410
        notus = NotusMetadataHandler(
411
            nvti=self.nvti, metadata_path="./tests/notus"
412
        )
413
        notus.nvti.add_vt_to_cache.return_value = None
414
        logging.Logger.debug = MagicMock()
415
        path = Path("./tests/notus/example.csv").resolve()
416
        purepath = PurePath(path).name
417
        with path.open("r") as openfile:
418
            for line_string in openfile:
419
                if line_string.startswith("{"):
420
                    break
421
            reader = DictReader(openfile)
422
423
            ret = notus.upload_lsc_from_csv_reader(
424
                purepath, general_metadata_dict, reader
425
            )
426
427
        self.assertTrue(ret)
428
        logging.Logger.debug.assert_called_with(
429
            "Loaded %d/%d advisories from %s", 1, 1, purepath
430
        )
431