Passed
Push — master ( cba5ed...7e35c2 )
by Simon
04:17
created

DataCollector.save()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 3
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
6
import os
7
import fcntl
8
import contextlib
9
import pandas as pd
10
11
12
@contextlib.contextmanager
13
def atomic_overwrite(filename):
14
    # from: https://stackoverflow.com/questions/42409707/pandas-to-csv-overwriting-prevent-data-loss
15
    temp = filename + "~"
16
    with open(temp, "w") as f:
17
        yield f
18
    os.rename(temp, filename)  # this will only happen if no exception was raised
19
20
21
class DataIO:
22
    def __init__(self, path, drop_duplicates):
23
        self.path = path
24
        self.replace_existing = False
25
        self.drop_duplicates = drop_duplicates
26
27
        if self.replace_existing:
28
            self.mode = "w"
29
        else:
30
            self.mode = "a"
31
32
    def _get_header(self, search_data, path):
33
        if os.path.isfile(path):
34
            if self.replace_existing:
35
                header = search_data.columns
36
            else:
37
                header = False
38
        else:
39
            header = search_data.columns
40
        return header
41
42
    def _save_search_data(self, search_data, io_wrap, header):
43
        if self.drop_duplicates:
44
            search_data.drop_duplicates(subset=self.drop_duplicates, inplace=True)
45
46
        search_data.to_csv(io_wrap, index=False, header=header)
47
48
    def atomic_write(self, search_data, path, replace_existing):
49
        self.replace_existing = replace_existing
50
        header = self._get_header(search_data, path)
51
52
        with atomic_overwrite(path) as io_wrap:
53
            self._save_search_data(search_data, io_wrap, header)
54
55
    def locked_write(self, search_data, path):
56
        header = self._get_header(search_data, path)
57
58
        with open(path, self.mode) as io_wrap:
59
            fcntl.flock(io_wrap, fcntl.LOCK_EX)
60
            self._save_search_data(search_data, io_wrap, header)
61
            fcntl.flock(io_wrap, fcntl.LOCK_UN)
62
63
    def load(self, path):
64
        if os.path.isfile(self.path) and os.path.getsize(self.path) > 0:
65
            return pd.read_csv(self.path)
66
67
68
class DataCollector:
69
    def __init__(self, path, drop_duplicates=False):
70
        self.path = path
71
        self.drop_duplicates = drop_duplicates
72
73
        self.path2file = path.rsplit("/", 1)[0] + "/"
74
        self.file_name = path.rsplit("/", 1)[1]
75
76
        self.io = DataIO(path, drop_duplicates)
77
78
    def load(self):
79
        return self.io.load(self.path)
80
81
    def append(self, dictionary):
82
        search_data = pd.DataFrame(dictionary, index=[0])
83
        self.io.locked_write(search_data, self.path)
84
85
    def save(self, dataframe, replace_existing=False):
86
        self.io.atomic_write(dataframe, self.path, replace_existing)
87