Completed
Push — master ( 62acd1...d58222 )
by
unknown
13s queued 11s
created

ospd.misc   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 135
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 75
dl 0
loc 135
rs 10
c 0
b 0
f 0
wmc 24

2 Methods

Rating   Name   Duplication   Size   Complexity  
A ResultType.get_type() 0 13 5
A ResultType.get_str() 0 13 5

5 Functions

Rating   Name   Duplication   Size   Complexity  
A create_process() 0 4 1
A valid_uuid() 0 8 2
A go_to_background() 0 8 3
A remove_pidfile() 0 12 4
A create_pid() 0 21 4
1
# Copyright (C) 2014-2018 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
# pylint: disable=too-many-lines
20
21
""" Miscellaneous classes and functions related to OSPD.
22
"""
23
24
import logging
25
import os
26
import sys
27
import uuid
28
import multiprocessing
29
30
from typing import Any, Callable, Iterable
31
from pathlib import Path
32
33
LOGGER = logging.getLogger(__name__)
34
35
36
def create_process(
37
    func: Callable, *, args: Iterable[Any] = None
38
) -> multiprocessing.Process:
39
    return multiprocessing.Process(target=func, args=args)
40
41
42
class ResultType(object):
43
44
    """ Various scan results types values. """
45
46
    ALARM = 0
47
    LOG = 1
48
    ERROR = 2
49
    HOST_DETAIL = 3
50
51
    @classmethod
52
    def get_str(cls, result_type: int) -> str:
53
        """ Return string name of a result type. """
54
        if result_type == cls.ALARM:
55
            return "Alarm"
56
        elif result_type == cls.LOG:
57
            return "Log Message"
58
        elif result_type == cls.ERROR:
59
            return "Error Message"
60
        elif result_type == cls.HOST_DETAIL:
61
            return "Host Detail"
62
        else:
63
            assert False, "Erroneous result type {0}.".format(result_type)
64
65
    @classmethod
66
    def get_type(cls, result_name: str) -> int:
67
        """ Return string name of a result type. """
68
        if result_name == "Alarm":
69
            return cls.ALARM
70
        elif result_name == "Log Message":
71
            return cls.LOG
72
        elif result_name == "Error Message":
73
            return cls.ERROR
74
        elif result_name == "Host Detail":
75
            return cls.HOST_DETAIL
76
        else:
77
            assert False, "Erroneous result name {0}.".format(result_name)
78
79
80
def valid_uuid(value) -> bool:
81
    """ Check if value is a valid UUID. """
82
83
    try:
84
        uuid.UUID(value, version=4)
85
        return True
86
    except (TypeError, ValueError, AttributeError):
87
        return False
88
89
90
def go_to_background() -> None:
91
    """ Daemonize the running process. """
92
    try:
93
        if os.fork():
94
            sys.exit()
95
    except OSError as errmsg:
96
        LOGGER.error('Fork failed: %s', errmsg)
97
        sys.exit(1)
98
99
100
def create_pid(pidfile: str) -> bool:
101
    """ Check if there is an already running daemon and creates the pid file.
102
    Otherwise gives an error. """
103
104
    pid = str(os.getpid())
105
    pidpath = Path(pidfile)
106
107
    if pidpath.is_file():
108
        LOGGER.error("There is an already running process.")
109
        return False
110
111
    try:
112
        with pidpath.open(mode='w') as f:
113
            f.write(pid)
114
    except (FileNotFoundError, PermissionError) as e:
115
        LOGGER.error(
116
            "Failed to create pid file %s. %s", str(pidpath.absolute()), e
117
        )
118
        return False
119
120
    return True
121
122
123
def remove_pidfile(pidfile: str, _signum=None, _frame=None) -> None:
124
    """ Removes the pidfile before ending the daemon. """
125
    pidpath = Path(pidfile)
126
127
    if not pidpath.is_file():
128
        return
129
130
    with pidpath.open() as f:
131
        if int(f.read()) == os.getpid():
132
            LOGGER.debug("Finishing daemon process")
133
            pidpath.unlink()
134
            sys.exit()
135