Passed
Push — master ( 7d2c3d...b4b259 )
by Simon
01:56 queued 11s
created

DataIO._get_header()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 3
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
6
import os
7
import contextlib
8
import pandas as pd
9
from filelock import FileLock
10
11
12
@contextlib.contextmanager
13
def atomic_overwrite(filename):
14
    # from: https://stackoverflow.com/questions/42409707/pandas-to-csv-overwriting-prevent-data-loss
15
    temp = filename + "~"
16
    with open(temp, "w") as f:
17
        yield f
18
    os.rename(temp, filename)  # this will only happen if no exception was raised
19
20
21
class DataIO:
22
    def __init__(self, path, drop_duplicates):
23
        self.path = path
24
        self.replace_existing = False
25
        self.drop_duplicates = drop_duplicates
26
27
        if self.replace_existing:
28
            self.mode = "w"
29
        else:
30
            self.mode = "a"
31
32
    def _save_dataframe(self, dataframe, io_wrap):
33
        if self.drop_duplicates:
34
            dataframe.drop_duplicates(subset=self.drop_duplicates, inplace=True)
35
36
        dataframe.to_csv(io_wrap, index=False, header=not io_wrap.tell())
37
38
    def atomic_write(self, dataframe, path, replace_existing):
39
        self.replace_existing = replace_existing
40
41
        with atomic_overwrite(path) as io_wrap:
42
            self._save_dataframe(dataframe, io_wrap)
43
44
    def locked_write(self, dataframe, path):
45
46
        lock = FileLock(path + ".lock~")
47
        with lock:
48
            with open(path, self.mode) as io_wrap:
49
                self._save_dataframe(dataframe, io_wrap)
50
51
        """
52
        import fcntl
53
54
        with open(path, self.mode) as io_wrap:
55
            fcntl.flock(io_wrap, fcntl.LOCK_EX)
56
            self._save_dataframe(dataframe, io_wrap)
57
            fcntl.flock(io_wrap, fcntl.LOCK_UN)
58
        """
59
60
    def load(self, path):
61
        if os.path.isfile(self.path) and os.path.getsize(self.path) > 0:
62
            return pd.read_csv(self.path)
63
64
65
class DataCollector:
66
    def __init__(self, path, drop_duplicates=False):
67
        self.path = path
68
        self.drop_duplicates = drop_duplicates
69
70
        self.path2file = path.rsplit("/", 1)[0] + "/"
71
        self.file_name = path.rsplit("/", 1)[1]
72
73
        self.io = DataIO(path, drop_duplicates)
74
75
    def load(self):
76
        return self.io.load(self.path)
77
78
    def append(self, dictionary):
79
        dataframe = pd.DataFrame(dictionary, index=[0])
80
        self.io.locked_write(dataframe, self.path)
81
82
    def save(self, dataframe, replace_existing=False):
83
        self.io.atomic_write(dataframe, self.path, replace_existing)
84