Completed
Push — master ( 1ed431...35dbab )
by
unknown
17s queued 13s
created

ospd.datapickler.DataPickler.store_data()   B

Complexity

Conditions 5

Size

Total Lines 36
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 27
nop 3
dl 0
loc 36
rs 8.7653
c 0
b 0
f 0
1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" Pickle Handler class
19
"""
20
21
import logging
22
import pickle
23
import os
24
25
from hashlib import sha256
26
from pathlib import Path
27
from typing import Dict, BinaryIO, Any
28
29
from ospd.errors import OspdCommandError
30
31
logger = logging.getLogger(__name__)
32
33
OWNER_ONLY_RW_PERMISSION = 0o600
34
35
36
class DataPickler:
37
    def __init__(self, storage_path: str):
38
        self._storage_path = storage_path
39
        self._storage_fd = None
40
41
    def _fd_opener(self, path: str, flags: int) -> BinaryIO:
42
        os.umask(0)
43
        flags = os.O_CREAT | os.O_WRONLY
44
        self._storage_fd = os.open(path, flags, mode=OWNER_ONLY_RW_PERMISSION)
45
        return self._storage_fd
46
47
    def _fd_close(self) -> None:
48
        try:
49
            self._storage_fd.close()
50
            self._storage_fd = None
51
        except Exception:  # pylint: disable=broad-except
52
            pass
53
54
    def remove_file(self, filename: str) -> None:
55
        """ Remove the file containing a scan_info pickled object """
56
        storage_file_path = Path(self._storage_path) / filename
57
        try:
58
            storage_file_path.unlink()
59
        except Exception as e:  # pylint: disable=broad-except
60
            logger.error('Not possible to delete %s. %s', filename, e)
61
62
    def store_data(self, filename: str, data_object: Any) -> str:
63
        """ Pickle a object and store it in a file named"""
64
        storage_file_path = Path(self._storage_path) / filename
65
66
        try:
67
            # create parent directories recursively
68
            parent_dir = storage_file_path.parent
69
            parent_dir.mkdir(parents=True, exist_ok=True)
70
        except Exception as e:  # pylint: disable=broad-except
71
            raise OspdCommandError(
72
                'Not possible to access dir for %s. %s' % (filename, e),
73
                'start_scan',
74
            )
75
76
        try:
77
            pickled_data = pickle.dumps(data_object)
78
        except pickle.PicklingError as e:
79
            raise OspdCommandError(
80
                'Not possible to pickle scan info for %s. %s' % (filename, e),
81
                'start_scan',
82
            )
83
84
        try:
85
            with open(
86
                str(storage_file_path), 'wb', opener=self._fd_opener
87
            ) as scan_info_f:
88
                scan_info_f.write(pickled_data)
89
        except Exception as e:  # pylint: disable=broad-except
90
            self._fd_close()
91
            raise OspdCommandError(
92
                'Not possible to store scan info for %s. %s' % (filename, e),
93
                'start_scan',
94
            )
95
        self._fd_close()
96
97
        return self._pickled_data_hash_generator(pickled_data)
98
99
    def load_data(self, filename: str, original_data_hash: str) -> Any:
100
        """ Unpickle the stored data in the filename. Perform an
101
        intengrity check of the read data with the the hash generated
102
        with the original data.
103
104
        Return:
105
            Dictionary containing the scan info. None otherwise.
106
        """
107
108
        storage_file_path = Path(self._storage_path) / filename
109
        pickled_data = None
110
        try:
111
            with storage_file_path.open('rb') as scan_info_f:
112
                pickled_data = scan_info_f.read()
113
        except Exception as e:  # pylint: disable=broad-except
114
            logger.error(
115
                'Not possible to read pickled data from %s. %s', filename, e
116
            )
117
            return
118
119
        unpickled_scan_info = None
120
        try:
121
            unpickled_scan_info = pickle.loads(pickled_data)
122
        except pickle.UnpicklingError as e:
123
            logger.error(
124
                'Not possible to read pickled data from %s. %s', filename, e
125
            )
126
            return
127
128
        pickled_scan_info_hash = self._pickled_data_hash_generator(pickled_data)
129
130
        if original_data_hash != pickled_scan_info_hash:
131
            logger.error('Unpickled data from %s corrupted.', filename)
132
            return
133
134
        return unpickled_scan_info
135
136
    def _pickled_data_hash_generator(self, pickled_data: bytes) -> str:
137
        """ Calculate the sha256 hash of a pickled data """
138
        if not pickled_data:
139
            return
140
141
        hash_sha256 = sha256()
142
        hash_sha256.update(pickled_data)
143
144
        return hash_sha256.hexdigest()
145