Completed
Pull Request — master (#100)
by Marc-Alexandre
01:01
created

smartdispatch.open_with_dirlock()   A

Complexity

Conditions 3

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 20
rs 9.4285
1
import os
2
import time
3
import fcntl
4
import logging
5
6
from contextlib import contextmanager
7
8
have_psutil = True
9
try:
10
    import psutil
11
except ImportError:
12
    have_psutil = False
13
14
# Constants needed for `open_with_dirlock` function.
15
MAX_ATTEMPTS = 1000  # This would correspond to be blocked for ~15min.
16
TIME_BETWEEN_ATTEMPTS = 1  # In seconds
17
18
19
def find_mount_point(path='.'):
20
    """ Finds the mount point used to access `path`. """
21
    path = os.path.abspath(path)
22
    while not os.path.ismount(path):
23
        path = os.path.dirname(path)
24
25
    return path
26
27
28
def get_fs(path='.'):
29
    """ Gets info about the filesystem on which `path` lives. """
30
    mount = find_mount_point(path)
31
32
    for fs in psutil.disk_partitions(True):
33
        if fs.mountpoint == mount:
34
            return fs
35
36
37
@contextmanager
38
def open_with_flock(*args, **kwargs):
39
    """ Context manager for opening file with an exclusive lock. """
40
    f = open(*args, **kwargs)
41
    try:
42
        fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
43
    except IOError:
44
        logging.info("Can't immediately write-lock the file ({0}), waiting ...".format(f.name))
45
        fcntl.lockf(f, fcntl.LOCK_EX)
46
47
    yield f
48
    fcntl.lockf(f, fcntl.LOCK_UN)
49
    f.close()
50
51
52
@contextmanager
53
def open_with_dirlock(*args, **kwargs):
54
    """ Context manager for opening file with an exclusive lock using. """
55
    dirname = os.path.dirname(args[0])
56
    filename = os.path.basename(args[0])
57
    lockfile = os.path.join(dirname, "." + filename)
58
59
    no_attempt = 0
60
    while no_attempt < MAX_ATTEMPTS:
61
        try:
62
            os.mkdir(lockfile)  # Atomic operation
63
            f = open(*args, **kwargs)
64
            yield f
65
            f.close()
66
            os.rmdir(lockfile)
67
            break
68
        except OSError:
69
            logging.info("Can't immediately write-lock the file ({0}), retrying in {1} sec. ...".format(filename, TIME_BETWEEN_ATTEMPTS))
70
            time.sleep(TIME_BETWEEN_ATTEMPTS)
71
            no_attempt += 1
72
73
74
# Determine if we can rely on the fcntl module for locking files on the cluster.
75
# Otherwise, fallback on using the directory creation atomicity as a locking mechanism.
76
open_with_lock = open_with_dirlock
77
if have_psutil:
78
    fs = get_fs('.')
79
    if fs.fstype == "lustre" and "localflock" not in fs.opts:
80
        open_with_lock = open_with_flock
81