Completed
Push — master ( 410638...e4e097 )
by
unknown
23s queued 12s
created

ospd.misc.create_pid()   B

Complexity

Conditions 7

Size

Total Lines 47
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 33
nop 1
dl 0
loc 47
rs 7.688
c 0
b 0
f 0
1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
# pylint: disable=too-many-lines
19
20
""" Miscellaneous classes and functions related to OSPD.
21
"""
22
23
import logging
24
import os
25
import sys
26
import uuid
27
import multiprocessing
28
29
from typing import Any, Callable, Iterable
30
from pathlib import Path
31
32
import psutil
33
34
LOGGER = logging.getLogger(__name__)
35
36
37
def create_process(
38
    func: Callable, *, args: Iterable[Any] = None
39
) -> multiprocessing.Process:
40
    return multiprocessing.Process(target=func, args=args)
41
42
43
class ResultType(object):
44
45
    """Various scan results types values."""
46
47
    ALARM = 0
48
    LOG = 1
49
    ERROR = 2
50
    HOST_DETAIL = 3
51
52
    @classmethod
53
    def get_str(cls, result_type: int) -> str:
54
        """Return string name of a result type."""
55
        if result_type == cls.ALARM:
56
            return "Alarm"
57
        elif result_type == cls.LOG:
58
            return "Log Message"
59
        elif result_type == cls.ERROR:
60
            return "Error Message"
61
        elif result_type == cls.HOST_DETAIL:
62
            return "Host Detail"
63
        else:
64
            assert False, "Erroneous result type {0}.".format(result_type)
65
66
    @classmethod
67
    def get_type(cls, result_name: str) -> int:
68
        """Return string name of a result type."""
69
        if result_name == "Alarm":
70
            return cls.ALARM
71
        elif result_name == "Log Message":
72
            return cls.LOG
73
        elif result_name == "Error Message":
74
            return cls.ERROR
75
        elif result_name == "Host Detail":
76
            return cls.HOST_DETAIL
77
        else:
78
            assert False, "Erroneous result name {0}.".format(result_name)
79
80
81
def valid_uuid(value) -> bool:
82
    """Check if value is a valid UUID."""
83
84
    try:
85
        uuid.UUID(value, version=4)
86
        return True
87
    except (TypeError, ValueError, AttributeError):
88
        return False
89
90
91
def go_to_background() -> None:
92
    """Daemonize the running process."""
93
    try:
94
        if os.fork():
95
            sys.exit()
96
    except OSError as errmsg:
97
        LOGGER.error('Fork failed: %s', errmsg)
98
        sys.exit(1)
99
100
101
def create_pid(pidfile: str) -> bool:
102
    """Check if there is an already running daemon and creates the pid file.
103
    Otherwise gives an error."""
104
105
    pid = str(os.getpid())
106
    new_process = psutil.Process(int(pid))
107
    new_process_name = new_process.name()
108
109
    pidpath = Path(pidfile)
110
111
    if pidpath.is_file():
112
        process_name = None
113
        with pidpath.open('r') as pidfile:
114
            current_pid = pidfile.read()
115
            try:
116
                process = psutil.Process(int(current_pid))
117
                process_name = process.name()
118
            except psutil.NoSuchProcess:
119
                pass
120
121
        if process_name == new_process_name:
122
            LOGGER.error(
123
                "There is an already running process. See %s.",
124
                str(pidpath.absolute()),
125
            )
126
            return False
127
        else:
128
            LOGGER.debug(
129
                "There is an existing pid file '%s', but the PID %s belongs to "
130
                "the process %s. It seems that %s was abruptly stopped. "
131
                "Removing the pid file.",
132
                str(pidpath.absolute()),
133
                current_pid,
134
                process_name,
135
                new_process_name,
136
            )
137
138
    try:
139
        with pidpath.open(mode='w') as f:
140
            f.write(pid)
141
    except (FileNotFoundError, PermissionError) as e:
142
        LOGGER.error(
143
            "Failed to create pid file %s. %s", str(pidpath.absolute()), e
144
        )
145
        return False
146
147
    return True
148