Passed
Pull Request — master (#257)
by Juan José
01:25
created

ospd_openvas.lock.LockFile.is_locked()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014-2020 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
import logging
21
import time
22
import fcntl
23
24
from pathlib import Path
25
26
logger = logging.getLogger(__name__)
27
28
29
class LockFile:
30
    def __init__(self, path: Path):
31
        self._lock_file_path = path
32
        self._has_lock = False
33
        self._fd = None
34
35
    def has_lock(self) -> bool:
36
        return self._has_lock
37
38
    def _acquire_lock(self) -> "LockFile":
39
        """ Acquite a lock by creating a lock file.
40
        """
41
        if self.has_lock():
42
            return self
43
44
        try:
45
            # create parent directories recursively
46
            parent_dir = self._lock_file_path.parent
47
            parent_dir.mkdir(parents=True, exist_ok=True)
48
49
            self._fd = self._lock_file_path.open('w')
50
        except Exception as e:  # pylint: disable=broad-except
51
            logger.error(
52
                "Failed to create lock file %s. %s",
53
                str(self._lock_file_path),
54
                e,
55
            )
56
            try:
57
                self._fd.close()
58
                self._fd = None
59
            except Exception:  # pylint: disable=broad-except
60
                pass
61
            return self
62
63
        # Try to adquire the lock.
64
        try:
65
            fcntl.flock(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
66
            self._has_lock = True
67
            logger.debug("Created lock file %s.", str(self._lock_file_path))
68
        except BlockingIOError as e:
69
            logger.error(
70
                "Failed to lock the file %s. %s", str(self._lock_file_path), e,
71
            )
72
            try:
73
                self._fd.close()
74
                self._fd = None
75
            except Exception:  # pylint: disable=broad-except
76
                pass
77
78
        return self
79
80
    def wait_for_lock(self):
81
        while not self.has_lock():
82
            self._acquire_lock()
83
            time.sleep(10)
84
85
        return self
86
87
    def _release_lock(self) -> None:
88
        """ Release the lock by deleting the lock file
89
        """
90
        if self.has_lock() and self._fd:
91
            fcntl.flock(self._fd, fcntl.LOCK_UN)
92
            self._fd.close()
93
            self._fd = None
94
            self._has_lock = False
95
            logger.debug(
96
                "Removed lock from file %s.", str(self._lock_file_path)
97
            )
98
99
    def __enter__(self):
100
        self._acquire_lock()
101
        return self
102
103
    def __exit__(self, exc_type, exc_value, exc_tb):
104
        self._release_lock()
105