Completed
Push — master ( ee6aac...b90bd1 )
by Björn
28s queued 12s
created

LockFileTestCase.test_get_csv_filepath()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 1
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
import logging
20
import unittest
21
22
from csv import DictReader
23
from pathlib import Path, PurePath
24
from collections import OrderedDict
25
from unittest.mock import patch, MagicMock
26
27
from ospd_openvas.notus.metadata import (
28
    NotusMetadataHandler,
29
    EXPECTED_FIELD_NAMES_LIST,
30
    METADATA_DIRECTORY_NAME,
31
)
32
from ospd_openvas.errors import OspdOpenvasError
33
34
35
class LockFileTestCase(unittest.TestCase):
36
    @patch('ospd_openvas.nvticache.NVTICache')
37
    def setUp(self, MockNvti):
38
        self.nvti = MockNvti()
39
40
    @patch('ospd_openvas.notus.metadata.Openvas')
41
    def test_set_openvas_settings(self, MockOpenvas):
42
        openvas = MockOpenvas()
43
        openvas.get_settings.return_value = {'nasl_no_signature_check': 0}
44
45
        notus = NotusMetadataHandler()
46
        no_signature_check = notus.openvas_setting.get(
47
            "nasl_no_signature_check"
48
        )
49
50
        self.assertEqual(no_signature_check, 0)
51
52
    @patch('ospd_openvas.notus.metadata.Openvas')
53
    def test_metadata_path(self, MockOpenvas):
54
        openvas = MockOpenvas()
55
        openvas.get_settings.return_value = {'plugins_folder': './tests/notus'}
56
        notus = NotusMetadataHandler()
57
58
        self.assertIsNone(notus._metadata_path)
59
        self.assertEqual(
60
            notus.metadata_path, f'./tests/notus/{METADATA_DIRECTORY_NAME}/'
61
        )
62
        self.assertEqual(
63
            notus._metadata_path, f'./tests/notus/{METADATA_DIRECTORY_NAME}/'
64
        )
65
66
    def test_is_checksum_correct_check_disable(self):
67
        notus = NotusMetadataHandler()
68
        notus._openvas_settings_dict = {'nasl_no_signature_check': 1}
69
70
        self.assertTrue(notus.is_checksum_correct(Path("foo")))
71
72
    def test_is_checksum_correct_enabled_false(self):
73
        notus = NotusMetadataHandler(nvti=self.nvti)
74
        notus.nvti.get_file_checksum.return_value = "abc123"
75
        notus._openvas_settings_dict = {'nasl_no_signature_check': 0}
76
77
        self.assertFalse(
78
            notus.is_checksum_correct(Path("./tests/notus/example.csv"))
79
        )
80
81
    def test_is_checksum_correct_enabled_true(self):
82
        notus = NotusMetadataHandler(nvti=self.nvti)
83
        notus.nvti.get_file_checksum.return_value = (
84
            "aafbaf3fac1c64a4006a02ed4657b975f47b07cd2915c8186ff1739ac6216e72"
85
        )
86
        notus._openvas_settings_dict = {'nasl_no_signature_check': 0}
87
88
        self.assertTrue(
89
            notus.is_checksum_correct(Path("./tests/notus/example.csv"))
90
        )
91
92
    def test_check_advisory_dict(self):
93
        advisory_dict = OrderedDict(
94
            [
95
                ('OID', '1.3.6.1.4.1.25623.1.1.2.2020.1234'),
96
                (
97
                    'TITLE',
98
                    'VendorOS: Security Advisory for git (VendorOS-2020-1234)',
99
                ),
100
                ('CREATION_DATE', '1600269468'),
101
                ('LAST_MODIFICATION', '1601380531'),
102
                ('SOURCE_PKGS', "['git']"),
103
                ('ADVISORY_ID', 'VendorOS-2020-1234'),
104
                ('CVSS_BASE_VECTOR', 'AV:N/AC:L/Au:N/C:C/I:C/A:C'),
105
                ('CVSS_BASE', '10.0'),
106
                ('ADVISORY_XREF', 'https://example.com'),
107
                ('DESCRIPTION', 'The remote host is missing an update.'),
108
                ('INSIGHT', 'buffer overflow'),
109
                ('AFFECTED', "'p1' package(s) on VendorOS V2.0SP1"),
110
                ('CVE_LIST', "['CVE-2020-1234']"),
111
                (
112
                    'BINARY_PACKAGES_FOR_RELEASES',
113
                    "{'VendorOS V2.0SP1': ['p1-1.1']}",
114
                ),
115
                ('XREFS', '[]'),
116
            ]
117
        )
118
119
        notus = NotusMetadataHandler()
120
        self.assertTrue(notus._check_advisory_dict(advisory_dict))
121
122
    def test_check_advisory_dict_no_value(self):
123
        advisory_dict = OrderedDict(
124
            [
125
                ('OID', '1.3.6.1.4.1.25623.1.1.2.2020.1234'),
126
                (
127
                    'TITLE',
128
                    'VendorOS: Security Advisory for git (VendorOS-2020-1234)',
129
                ),
130
                ('CREATION_DATE', None),
131
                ('LAST_MODIFICATION', '1601380531'),
132
                ('SOURCE_PKGS', "['git']"),
133
                ('ADVISORY_ID', 'VendorOS-2020-1234'),
134
                ('CVSS_BASE_VECTOR', 'AV:N/AC:L/Au:N/C:C/I:C/A:C'),
135
                ('CVSS_BASE', '10.0'),
136
                ('ADVISORY_XREF', 'https://example.com'),
137
                ('DESCRIPTION', 'The remote host is missing an update.'),
138
                ('INSIGHT', 'buffer overflow'),
139
                ('AFFECTED', "'p1' package(s) on VendorOS V2.0SP1"),
140
                ('CVE_LIST', "['CVE-2020-1234']"),
141
                (
142
                    'BINARY_PACKAGES_FOR_RELEASES',
143
                    "{'VendorOS V2.0SP1': ['p1-1.1']}",
144
                ),
145
                ('XREFS', '[]'),
146
            ]
147
        )
148
149
        notus = NotusMetadataHandler()
150
        self.assertFalse(notus._check_advisory_dict(advisory_dict))
151
152
    def test_check_advisory_dict_no_package(self):
153
        advisory_dict = OrderedDict(
154
            [
155
                ('OID', '1.3.6.1.4.1.25623.1.1.2.2020.1234'),
156
                (
157
                    'TITLE',
158
                    'VendorOS: Security Advisory for git (VendorOS-2020-1234)',
159
                ),
160
                ('CREATION_DATE', '1600269468'),
161
                ('LAST_MODIFICATION', '1601380531'),
162
                ('SOURCE_PKGS', "[]"),
163
                ('ADVISORY_ID', 'VendorOS-2020-1234'),
164
                ('CVSS_BASE_VECTOR', 'AV:N/AC:L/Au:N/C:C/I:C/A:C'),
165
                ('CVSS_BASE', '10.0'),
166
                ('ADVISORY_XREF', 'https://example.com'),
167
                ('DESCRIPTION', 'The remote host is missing an update.'),
168
                ('INSIGHT', 'buffer overflow'),
169
                ('AFFECTED', "'p1' package(s) on VendorOS V2.0SP1"),
170
                ('CVE_LIST', "['CVE-2020-1234']"),
171
                (
172
                    'BINARY_PACKAGES_FOR_RELEASES',
173
                    "{'VendorOS V2.0SP1': ['p1-1.1']}",
174
                ),
175
                ('XREFS', '[]'),
176
            ]
177
        )
178
179
        notus = NotusMetadataHandler()
180
        self.assertFalse(notus._check_advisory_dict(advisory_dict))
181
182
    def test_check_advisory_dict_valerr(self):
183
        advisory_dict = OrderedDict(
184
            [
185
                ('OID', '1.3.6.1.4.1.25623.1.1.2.2020.1234'),
186
                (
187
                    'TITLE',
188
                    'VendorOS: Security Advisory for git (VendorOS-2020-1234)',
189
                ),
190
                ('CREATION_DATE', '1600269468'),
191
                ('LAST_MODIFICATION', '1601380531'),
192
                ('SOURCE_PKGS', "a"),
193
                ('ADVISORY_ID', 'VendorOS-2020-1234'),
194
                ('CVSS_BASE_VECTOR', 'AV:N/AC:L/Au:N/C:C/I:C/A:C'),
195
                ('CVSS_BASE', '10.0'),
196
                ('ADVISORY_XREF', 'https://example.com'),
197
                ('DESCRIPTION', 'The remote host is missing an update.'),
198
                ('INSIGHT', 'buffer overflow'),
199
                ('AFFECTED', "'p1' package(s) on VendorOS V2.0SP1"),
200
                ('CVE_LIST', "['CVE-2020-1234']"),
201
                (
202
                    'BINARY_PACKAGES_FOR_RELEASES',
203
                    "{'VendorOS V2.0SP1': ['p1-1.1']}",
204
                ),
205
                ('XREFS', '[]'),
206
            ]
207
        )
208
209
        notus = NotusMetadataHandler()
210
        self.assertFalse(notus._check_advisory_dict(advisory_dict))
211
212
    def test_format_xrefs(self):
213
        notus = NotusMetadataHandler()
214
        ret = notus._format_xrefs(
215
            "https://example.com", ["www.foo.net", "www.bar.net"]
216
        )
217
218
        self.assertEqual(
219
            ret, "URL:https://example.com, URL:www.foo.net, URL:www.bar.net"
220
        )
221
222
    def test_check_field_names_lsc(self):
223
        notus = NotusMetadataHandler()
224
        field_names_list = [
225
            "OID",
226
            "TITLE",
227
            "CREATION_DATE",
228
            "LAST_MODIFICATION",
229
            "SOURCE_PKGS",
230
            "ADVISORY_ID",
231
            "CVSS_BASE_VECTOR",
232
            "CVSS_BASE",
233
            "ADVISORY_XREF",
234
            "DESCRIPTION",
235
            "INSIGHT",
236
            "AFFECTED",
237
            "CVE_LIST",
238
            "BINARY_PACKAGES_FOR_RELEASES",
239
            "XREFS",
240
        ]
241
242
        self.assertTrue(notus._check_field_names_lsc(field_names_list))
243
244
    def test_check_field_names_lsc_unordered(self):
245
        notus = NotusMetadataHandler()
246
        field_names_list = [
247
            "TITLE",
248
            "OID",
249
            "CREATION_DATE",
250
            "LAST_MODIFICATION",
251
            "SOURCE_PKGS",
252
            "ADVISORY_ID",
253
            "CVSS_BASE_VECTOR",
254
            "CVSS_BASE",
255
            "ADVISORY_XREF",
256
            "DESCRIPTION",
257
            "INSIGHT",
258
            "AFFECTED",
259
            "CVE_LIST",
260
            "BINARY_PACKAGES_FOR_RELEASES",
261
            "XREFS",
262
        ]
263
264
        self.assertFalse(notus._check_field_names_lsc(field_names_list))
265
266
    def test_check_field_names_lsc_missing(self):
267
        notus = NotusMetadataHandler()
268
        field_names_list = [
269
            "OID",
270
            "CREATION_DATE",
271
            "LAST_MODIFICATION",
272
            "SOURCE_PKGS",
273
            "ADVISORY_ID",
274
            "CVSS_BASE_VECTOR",
275
            "CVSS_BASE",
276
            "ADVISORY_XREF",
277
            "DESCRIPTION",
278
            "INSIGHT",
279
            "AFFECTED",
280
            "CVE_LIST",
281
            "BINARY_PACKAGES_FOR_RELEASES",
282
            "XREFS",
283
        ]
284
285
        self.assertFalse(notus._check_field_names_lsc(field_names_list))
286
287
    def test_get_csv_filepath(self):
288
        path = Path("./tests/notus/example.csv").resolve()
289
290
        notus = NotusMetadataHandler(metadata_path="./tests/notus/")
291
        ret = notus._get_csv_filepaths()
292
293
        self.assertEqual(ret, [path])
294
295
    def test_update_metadata_warning(self):
296
        notus = NotusMetadataHandler()
297
        logging.Logger.warning = MagicMock()
298
        path = Path("./tests/notus/example.csv").resolve()
299
300
        notus._get_csv_filepaths = MagicMock(return_value=[path])
301
        notus.is_checksum_correct = MagicMock(return_value=False)
302
303
        notus.update_metadata()
304
        logging.Logger.warning.assert_called_with(
305
            f'Checksum for %s failed', path
306
        )
307
308
    def test_update_metadata_field_name_failed(self):
309
        notus = NotusMetadataHandler(metadata_path="./tests/notus")
310
        logging.Logger.warning = MagicMock()
311
        path = Path("./tests/notus/example.csv").resolve()
312
313
        notus._get_csv_filepaths = MagicMock(return_value=[path])
314
        notus.is_checksum_correct = MagicMock(return_value=True)
315
        notus._check_field_names_lsc = MagicMock(return_value=False)
316
317
        notus.update_metadata()
318
319
        logging.Logger.warning.assert_called_with(
320
            f'Field names check for %s failed', path
321
        )
322
323
    def test_update_metadata_failed(self):
324
        notus = NotusMetadataHandler(metadata_path="./tests/notus")
325
        logging.Logger.warning = MagicMock()
326
        path = Path("./tests/notus/example.csv").resolve()
327
328
        notus._get_csv_filepaths = MagicMock(return_value=[path])
329
        notus.is_checksum_correct = MagicMock(return_value=True)
330
        notus._check_field_names_lsc = MagicMock(return_value=True)
331
        notus.upload_lsc_from_csv_reader = MagicMock(return_value=False)
332
333
        notus.update_metadata()
334
335
        logging.Logger.warning.assert_called_with(
336
            "Some advaisory was not loaded from %s", path.name
337
        )
338
339
    def test_update_metadata_success(self):
340
        notus = NotusMetadataHandler(metadata_path="./tests/notus")
341
        logging.Logger.warning = MagicMock()
342
        path = Path("./tests/notus/example.csv").resolve()
343
        purepath = PurePath(path).name
344
345
        notus._get_csv_filepaths = MagicMock(return_value=[path])
346
        notus.is_checksum_correct = MagicMock(return_value=True)
347
        notus._check_field_names_lsc = MagicMock(return_value=True)
348
        notus.upload_lsc_from_csv_reader = MagicMock(return_value=True)
349
350
        notus.update_metadata()
351
352
        logging.Logger.warning.assert_not_called()
353
354 View Code Duplication
    def test_upload_lsc_from_csv_reader_failed(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
355
        general_metadata_dict = {
356
            'VULDETECT': 'Checks if a vulnerable package version is present on the target host.',
357
            'SOLUTION': 'Please install the updated package(s).',
358
            'SOLUTION_TYPE': 'VendorFix',
359
            'QOD_TYPE': 'package',
360
        }
361
362
        notus = NotusMetadataHandler(
363
            nvti=self.nvti, metadata_path="./tests/notus"
364
        )
365
        notus.nvti.add_vt_to_cache.side_effect = OspdOpenvasError
366
        logging.Logger.debug = MagicMock()
367
        path = Path("./tests/notus/example.csv").resolve()
368
        purepath = PurePath(path).name
369
        with path.open("r") as openfile:
370
            for line_string in openfile:
371
                if line_string.startswith("{"):
372
                    break
373
            reader = DictReader(openfile)
374
375
            ret = notus.upload_lsc_from_csv_reader(
376
                purepath, general_metadata_dict, reader
377
            )
378
379
        self.assertFalse(ret)
380
        logging.Logger.debug.assert_called_with(
381
            "Loaded %d/%d advisories from %s", 0, 1, purepath
382
        )
383
384 View Code Duplication
    def test_upload_lsc_from_csv_reader_sucess(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
385
        general_metadata_dict = {
386
            'VULDETECT': 'Checks if a vulnerable package version is present on the target host.',
387
            'SOLUTION': 'Please install the updated package(s).',
388
            'SOLUTION_TYPE': 'VendorFix',
389
            'QOD_TYPE': 'package',
390
        }
391
392
        notus = NotusMetadataHandler(
393
            nvti=self.nvti, metadata_path="./tests/notus"
394
        )
395
        notus.nvti.add_vt_to_cache.return_value = None
396
        logging.Logger.debug = MagicMock()
397
        path = Path("./tests/notus/example.csv").resolve()
398
        purepath = PurePath(path).name
399
        with path.open("r") as openfile:
400
            for line_string in openfile:
401
                if line_string.startswith("{"):
402
                    break
403
            reader = DictReader(openfile)
404
405
            ret = notus.upload_lsc_from_csv_reader(
406
                purepath, general_metadata_dict, reader
407
            )
408
409
        self.assertTrue(ret)
410
        logging.Logger.debug.assert_called_with(
411
            "Loaded %d/%d advisories from %s", 1, 1, purepath
412
        )
413