open_with_flock()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 6
dl 0
loc 32
rs 7.5384
c 4
b 0
f 0
1
import os
2
import time
3
import fcntl
4
import psutil
5
import logging
6
import errno
7
8
from contextlib import contextmanager
9
10
# Constants needed for `open_with_dirlock` function.
11
MAX_ATTEMPTS = 1000  # This would correspond to be blocked for ~15min.
12
TIME_BETWEEN_ATTEMPTS = 2  # In seconds
13
14
15
def find_mount_point(path='.'):
16
    """ Finds the mount point used to access `path`. """
17
    path = os.path.abspath(path)
18
    while not os.path.ismount(path):
19
        path = os.path.dirname(path)
20
21
    return path
22
23
24
def get_fs(path='.'):
25
    """ Gets info about the filesystem on which `path` lives. """
26
    mount = find_mount_point(path)
27
28
    for fs in psutil.disk_partitions(True):
29
        if fs.mountpoint == mount:
30
            return fs
31
32
33
@contextmanager
34
def open_with_flock(*args, **kwargs):
35
    """ Context manager for opening file with an exclusive lock. """
36
    f = open(*args, **kwargs)
37
    try:
38
        fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
39
    except IOError:
40
        no_attempt = 0
41
        while no_attempt < MAX_ATTEMPTS:
42
            try:
43
                logging.info("Can't immediately write-lock the file ({0}), waiting.".format(f.name))
44
                start_time = time.time()
45
                fcntl.lockf(f, fcntl.LOCK_EX)
46
                break
47
            except IOError as e:
48
                if e.errno == errno.EDEADLK:
49
                    logging.warn("The OS complained because the process have been waiting on the lockf for {0} sec with the error ({1}: {2}). Retrying. ".format(time.time() - start_time, e.errno, e.strerror))
50
                    f.close()
51
                    time.sleep(TIME_BETWEEN_ATTEMPTS)
52
                    f = open(*args, **kwargs)
53
                    no_attempt += 1
54
                else:
55
                    raise e
56
57
        if no_attempt == MAX_ATTEMPTS:
58
            raise IOError("Failed to lock {0} {1} times.".format(f.name, MAX_ATTEMPTS))
59
60
    try:
61
        yield f
62
    finally:
63
        fcntl.lockf(f, fcntl.LOCK_UN)
64
        f.close()
65
66
67
@contextmanager
68
def open_with_dirlock(*args, **kwargs):
69
    """ Context manager for opening file with an exclusive lock using. """
70
    dirname = os.path.dirname(args[0])
71
    filename = os.path.basename(args[0])
72
    lockfile = os.path.join(dirname, "." + filename)
73
74
    no_attempt = 0
75
    while no_attempt < MAX_ATTEMPTS:
76
        try:
77
            os.mkdir(lockfile)  # Atomic operation
78
            break
79
        except OSError:
80
            logging.info("Can't immediately write-lock the file ({0}), retrying in {1} sec.".format(filename, TIME_BETWEEN_ATTEMPTS))
81
            time.sleep(TIME_BETWEEN_ATTEMPTS)
82
            no_attempt += 1
83
84
    if no_attempt == MAX_ATTEMPTS:
85
        raise IOError("Failed to lock {0} {1} times.".format(filename, MAX_ATTEMPTS))
86
87
    try:
88
        with open(*args, **kwargs) as f:
89
            yield f
90
    finally:
91
        os.rmdir(lockfile)
92
93
94
def _fs_support_globalflock(fs):
95
    if fs.fstype == "lustre":
96
        return ("flock" in fs.opts) and "localflock" not in fs.opts
97
98
    elif fs.fstype == "gpfs":
99
        return True
100
101
    return False  # We don't know.
102
103
104
# Determine if we can rely on the fcntl module for locking files on the cluster.
105
# Otherwise, fallback on using the directory creation atomicity as a locking mechanism.
106
fs = get_fs('.')
107
if _fs_support_globalflock(fs):
108
    open_with_lock = open_with_flock
109
else:
110
    logging.warn("Cluster does not support flock! Falling back to folder lock.")
111
    open_with_lock = open_with_dirlock
112