Completed
Push — 0.5.3 ( f700f6...c9a091 )
by Felipe A.
01:03
created

TarFileStream   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 140
Duplicated Lines 0 %

Importance

Changes 8
Bugs 0 Features 2
Metric Value
c 8
b 0
f 2
dl 0
loc 140
rs 10
wmc 14

5 Methods

Rating   Name   Duplication   Size   Complexity  
B read() 0 36 4
A __iter__() 0 14 2
B fill() 0 23 5
A write() 0 19 2
B __init__() 0 29 1
1
2
import os
3
import os.path
4
import tarfile
5
import functools
6
import threading
7
8
9
class TarFileStream(object):
10
    '''
11
    Tarfile which compresses while reading for streaming.
12
13
    Buffsize can be provided, it must be 512 multiple (the tar block size) for
14
    compression.
15
16
    Note on corroutines: this class uses threading by default, but
17
    corroutine-based applications can change this behavior overriding the
18
    :attr:`event_class` and :attr:`thread_class` values.
19
    '''
20
    event_class = threading.Event
21
    thread_class = threading.Thread
22
    tarfile_class = tarfile.open
23
24
    def __init__(self, path, buffsize=10240, exclude=None):
25
        '''
26
        Internal tarfile object will be created, and compression will start
27
        on a thread until buffer became full with writes becoming locked until
28
        a read occurs.
29
30
        :param path: local path of directory whose content will be compressed.
31
        :type path: str
32
        :param buffsize: size of internal buffer on bytes, defaults to 10KiB
33
        :type buffsize: int
34
        :param exclude: path filter function, defaults to None
35
        :type exclude: callable
36
        '''
37
        self.path = path
38
        self.name = os.path.basename(path) + ".tgz"
39
        self.exclude = exclude
40
41
        self._finished = 0
42
        self._want = 0
43
        self._data = bytes()
44
        self._add = self.event_class()
45
        self._result = self.event_class()
46
        self._tarfile = self.tarfile_class(  # stream write
47
            fileobj=self,
48
            mode="w|gz",
49
            bufsize=buffsize
50
            )
51
        self._th = self.thread_class(target=self.fill)
52
        self._th.start()
53
54
    def fill(self):
55
        '''
56
        Writes data on internal tarfile instance, which writes to current
57
        object, using :meth:`write`.
58
59
        As this method is blocking, it is used inside a thread.
60
61
        This method is called automatically, on a thread, on initialization,
62
        so there is little need to call it manually.
63
        '''
64
        if self.exclude:
65
            exclude = self.exclude
66
            ap = functools.partial(os.path.join, self.path)
67
            self._tarfile.add(
68
                self.path, "",
69
                filter=lambda info: None if exclude(ap(info.name)) else info
70
                )
71
        else:
72
            self._tarfile.add(self.path, "")
73
        self._tarfile.close()  # force stream flush
74
        self._finished += 1
75
        if not self._result.is_set():
76
            self._result.set()
77
78
    def write(self, data):
79
        '''
80
        Write method used by internal tarfile instance to output data.
81
        This method blocks tarfile execution once internal buffer is full.
82
83
        As this method is blocking, it is used inside the same thread of
84
        :meth:`fill`.
85
86
        :param data: bytes to write to internal buffer
87
        :type data: bytes
88
        :returns: number of bytes written
89
        :rtype: int
90
        '''
91
        self._add.wait()
92
        self._data += data
93
        if len(self._data) > self._want:
94
            self._add.clear()
95
            self._result.set()
96
        return len(data)
97
98
    def read(self, want=0):
99
        '''
100
        Read method, gets data from internal buffer while releasing
101
        :meth:`write` locks when needed.
102
103
        The lock usage means it must ran on a different thread than
104
        :meth:`fill`, ie. the main thread, otherwise will deadlock.
105
106
        The combination of both write and this method running on different
107
        threads makes tarfile being streamed on-the-fly, with data chunks being
108
        processed and retrieved on demand.
109
110
        :param want: number bytes to read, defaults to 0 (all available)
111
        :type want: int
112
        :returns: tarfile data as bytes
113
        :rtype: bytes
114
        '''
115
        if self._finished:
116
            if self._finished == 1:
117
                self._finished += 1
118
                return ""
119
            return EOFError("EOF reached")
120
121
        # Thread communication
122
        self._want = want
123
        self._add.set()
124
        self._result.wait()
125
        self._result.clear()
126
127
        if want:
128
            data = self._data[:want]
129
            self._data = self._data[want:]
130
        else:
131
            data = self._data
132
            self._data = bytes()
133
        return data
134
135
    def __iter__(self):
136
        '''
137
        Iterate through tarfile result chunks.
138
139
        Similarly to :meth:`read`, this methos must ran on a different thread
140
        than :meth:`write` calls.
141
142
        :yields: data chunks as taken from :meth:`read`.
143
        :ytype: bytes
144
        '''
145
        data = self.read()
146
        while data:
147
            yield data
148
            data = self.read()
149