Completed
Push — master ( 1d74fa...8e652f )
by Marc-Alexandre
01:02
created

open_with_flock()   B

Complexity

Conditions 6

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 29
rs 7.5384
1
import os
2
import time
3
import fcntl
4
import psutil
5
import logging
6
import errno
7
8
from contextlib import contextmanager
9
10
# Constants needed for `open_with_dirlock` function.
11
MAX_ATTEMPTS = 1000  # This would correspond to be blocked for ~15min.
12
TIME_BETWEEN_ATTEMPTS = 2  # In seconds
13
14
15
def find_mount_point(path='.'):
16
    """ Finds the mount point used to access `path`. """
17
    path = os.path.abspath(path)
18
    while not os.path.ismount(path):
19
        path = os.path.dirname(path)
20
21
    return path
22
23
24
def get_fs(path='.'):
25
    """ Gets info about the filesystem on which `path` lives. """
26
    mount = find_mount_point(path)
27
28
    for fs in psutil.disk_partitions(True):
29
        if fs.mountpoint == mount:
30
            return fs
31
32
33
@contextmanager
34
def open_with_flock(*args, **kwargs):
35
    """ Context manager for opening file with an exclusive lock. """
36
    f = open(*args, **kwargs)
37
    try:
38
        fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
39
    except IOError:
40
        no_attempt = 0
41
        while no_attempt < MAX_ATTEMPTS:
42
            try:
43
                logging.info("Can't immediately write-lock the file ({0}), waiting.".format(f.name))
44
                start_time = time.time()
45
                fcntl.lockf(f, fcntl.LOCK_EX)
46
                break
47
            except IOError as e:
48
                if e.errno == errno.EDEADLK:
49
                    logging.warn("The OS complained because the process have been waiting on the lockf for {0} sec with the error ({1}: {2}). Retrying. ".format(time.time() - start_time), e.errno, e.strerror)
50
                    no_attempt += 1
51
                else:
52
                    raise e
53
54
        if no_attempt == MAX_ATTEMPTS:
55
            raise IOError("Failed to lock {0} {1} times.".format(f.name, MAX_ATTEMPTS))
56
57
    try:
58
        yield f
59
    finally:
60
        fcntl.lockf(f, fcntl.LOCK_UN)
61
        f.close()
62
63
64
@contextmanager
65
def open_with_dirlock(*args, **kwargs):
66
    """ Context manager for opening file with an exclusive lock using. """
67
    dirname = os.path.dirname(args[0])
68
    filename = os.path.basename(args[0])
69
    lockfile = os.path.join(dirname, "." + filename)
70
71
    no_attempt = 0
72
    while no_attempt < MAX_ATTEMPTS:
73
        try:
74
            os.mkdir(lockfile)  # Atomic operation
75
            break
76
        except OSError:
77
            logging.info("Can't immediately write-lock the file ({0}), retrying in {1} sec.".format(filename, TIME_BETWEEN_ATTEMPTS))
78
            time.sleep(TIME_BETWEEN_ATTEMPTS)
79
            no_attempt += 1
80
81
    if no_attempt == MAX_ATTEMPTS:
82
        raise IOError("Failed to lock {0} {1} times.".format(filename, MAX_ATTEMPTS))
83
84
    try:
85
        with open(*args, **kwargs) as f:
86
            yield f
87
    finally:
88
        os.rmdir(lockfile)
89
90
91
def _fs_support_globalflock(fs):
92
    if fs.fstype == "lustre":
93
        return ("flock" in fs.opts) and "localflock" not in fs.opts
94
95
    elif fs.fstype == "gpfs":
96
        return True
97
98
    return False  # We don't know.
99
100
101
# Determine if we can rely on the fcntl module for locking files on the cluster.
102
# Otherwise, fallback on using the directory creation atomicity as a locking mechanism.
103
fs = get_fs('.')
104
if _fs_support_globalflock(fs):
105
    open_with_lock = open_with_flock
106
else:
107
    logging.warn("Cluster does not support flock! Falling back to folder lock.")
108
    open_with_lock = open_with_dirlock
109