Completed
Pull Request — master (#100)
by Marc-Alexandre
57s
created

smartdispatch.get_fs()   A

Complexity

Conditions 3

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 7
rs 9.4285
1
import os
2
import time
3
import fcntl
4
import psutil
5
import logging
6
7
from contextlib import contextmanager
8
9
# Constants needed for `open_with_dirlock` function.
10
MAX_ATTEMPTS = 1000  # This would correspond to be blocked for ~15min.
11
TIME_BETWEEN_ATTEMPTS = 1  # In seconds
12
13
14
def find_mount_point(path='.'):
15
    """ Finds the mount point used to access `path`. """
16
    path = os.path.abspath(path)
17
    while not os.path.ismount(path):
18
        path = os.path.dirname(path)
19
20
    return path
21
22
23
def get_fs(path='.'):
24
    """ Gets info about the filesystem on which `path` lives. """
25
    mount = find_mount_point(path)
26
27
    for fs in psutil.disk_partitions(True):
28
        if fs.mountpoint == mount:
29
            return fs
30
31
32
@contextmanager
33
def open_with_flock(*args, **kwargs):
34
    """ Context manager for opening file with an exclusive lock. """
35
    f = open(*args, **kwargs)
36
    try:
37
        fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
38
    except IOError:
39
        logging.info("Can't immediately write-lock the file ({0}), waiting ...".format(f.name))
40
        fcntl.lockf(f, fcntl.LOCK_EX)
41
42
    yield f
43
    fcntl.lockf(f, fcntl.LOCK_UN)
44
    f.close()
45
46
47
@contextmanager
48
def open_with_dirlock(*args, **kwargs):
49
    """ Context manager for opening file with an exclusive lock using. """
50
    dirname = os.path.dirname(args[0])
51
    filename = os.path.basename(args[0])
52
    lockfile = os.path.join(dirname, "." + filename)
53
54
    no_attempt = 0
55
    while no_attempt < MAX_ATTEMPTS:
56
        try:
57
            os.mkdir(lockfile)  # Atomic operation
58
            f = open(*args, **kwargs)
59
            yield f
60
            f.close()
61
            os.rmdir(lockfile)
62
            break
63
        except OSError:
64
            logging.info("Can't immediately write-lock the file ({0}), retrying in {1} sec. ...".format(filename, TIME_BETWEEN_ATTEMPTS))
65
            time.sleep(TIME_BETWEEN_ATTEMPTS)
66
            no_attempt += 1
67
68
69
def _fs_support_globalflock(fs):
70
    if fs.fstype == "lustre":
71
        return ("flock" in fs.opts) and "localflock" not in fs.opts
72
73
    elif fs.fstype == "gpfs":
74
        return True
75
76
    return False  # We don't know.
77
78
79
# Determine if we can rely on the fcntl module for locking files on the cluster.
80
# Otherwise, fallback on using the directory creation atomicity as a locking mechanism.
81
fs = get_fs('.')
82
if _fs_support_globalflock(fs):
83
    open_with_lock = open_with_flock
84
else:
85
    logging.warn("Cluster does not support flock!")
86
    open_with_lock = open_with_dirlock
87