Completed
Push — master ( b6035d...10c423 )
by Andy
32s
created

wrapper.__init__()   A

Complexity

Conditions 3

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 3
c 1
b 1
f 0
dl 0
loc 5
rs 9.4285
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""
5
General utility functions.
6
"""
7
8
__all__ = ["short_hash", "wrapper"]
9
10
import functools
11
import logging
12
import os
13
import pickle
14
import signal
15
import sys
16
from six import string_types
17
from tempfile import mkstemp
18
from time import time
19
from collections import Iterable
20
from hashlib import md5
21
from multiprocessing.pool import Pool
22
from multiprocessing import Lock, TimeoutError, Value
23
24
logger = logging.getLogger(__name__)
25
26
27
# Initialize global counter for incrementing between threads.
28
_counter = Value('i', 0)
29
_counter_lock = Lock()
30
31
def _init_pool(args):
32
    global _counter
33
    _counter = args
34
35
class wrapper(object):
36
    """
37
    A generic wrapper with a progressbar, which can be used either in serial or
38
    in parallel.
39
40
    :param f:
41
        The function to apply.
42
43
    :param args:
44
        Additional arguments to supply to the function `f`.
45
46
    :param kwds:
47
        Keyword arguments to supply to the function `f`.
48
49
    :param N:
50
        The number of items that will be iterated over.
51
52
    :param message: [optional]
53
        An information message to log before showing the progressbar.
54
55
    :param size: [optional]
56
        The width of the progressbar in characters.
57
58
    :returns:
59
        A generator. 
60
    """
61
62
    def __init__(self, f, args, kwds, N, message=None, size=100):
63
        self.f = f
64
        self.args = list(args if args is not None else [])
65
        self.kwds = kwds if kwds is not None else {}
66
        self._init_progressbar(N, message)
67
        
68
69
    def _init_progressbar(self, N, message=None):
70
        """
71
        Initialise a progressbar.
72
73
        :param N:
74
            The number of items that will be iterated over.
75
        
76
        :param message: [optional]
77
            An information message to log before showing the progressbar.
78
        """
79
80
        self.N = int(N)
81
        
82
        try:
83
            rows, columns = os.popen('stty size', 'r').read().split()
84
85
        except:
86
            logger.debug("Couldn't get screen size. Progressbar may look odd.")
87
            self.W = 100
88
89
        else:
90
            self.W = min(100, int(columns) - (12 + 21 + 2 * len(str(self.N))))
91
92
        self.t_init = time()
93
        self.message = message
94
        if 0 >= self.N:
95
            return None
96
97
        if message is not None:
98
            logger.info(message.rstrip())
99
        
100
        sys.stdout.flush()
101
        with _counter_lock:
102
            _counter.value = 0
103
            
104
105
    def _update_progressbar(self):
106
        """
107
        Increment the progressbar by one iteration.
108
        """
109
        
110
        if 0 >= self.N:
111
            return None
112
113
        global _counter, _counter_lock
114
        with _counter_lock:
115
            _counter.value += 1
116
117
        index = _counter.value
118
        
119
        increment = max(1, int(self.N/float(self.W)))
120
        
121
        eta_minutes = ((time() - self.t_init) / index) * (self.N - index) / 60.0
122
        
123
        if index >= self.N:
124
            status = "({0:.0f}s)                         ".format(time() - self.t_init)
125
126
        elif float(index)/self.N >= 0.05 \
127
        and eta_minutes > 1: # MAGIC fraction for when we can predict ETA
128
            status = "({0}/{1}; ~{2:.0f}m until finished)".format(
129
                        index, self.N, eta_minutes)
130
131
        else:
132
            status = "({0}/{1})                          ".format(index, self.N)
133
134
        sys.stdout.write(
135
            ("\r[{done: <" + str(self.W) + "}] {percent:3.0f}% {status}").format(
136
            done="=" * int(index/increment),
137
            percent=100. * index/self.N,
138
            status=status))
139
        sys.stdout.flush()
140
141
        if index >= self.N:
142
            sys.stdout.write("\r\n")
143
            sys.stdout.flush()
144
145
146
    def __call__(self, x):
147
        try:
148
            result = self.f(*(list(x) + self.args), **self.kwds)
149
        except:
150
            logger.exception("Exception within wrapped function")
151
            raise
152
153
        self._update_progressbar()
154
        return result
155
156
157
def short_hash(contents):
158
    """
159
    Return a short hash string of some iterable content.
160
161
    :param contents:
162
        The contents to calculate a hash for.
163
164
    :returns:
165
        A concatenated string of 10-character length hashes for all items in the
166
        contents provided.
167
    """
168
    if not isinstance(contents, Iterable): contents = [contents]
169
    return "".join([str(md5(str(item).encode("utf-8")).hexdigest())[:10] \
170
        for item in contents])
171
172
173
def _unpack_value(value):
174
    """
175
    Unpack contents if it is pickled to a temporary file.
176
177
    :param value:
178
        A non-string variable or a string referring to a pickled file path.
179
180
    :returns:
181
        The original value, or the unpacked contents if a valid path was given.
182
    """
183
184
    if isinstance(value, string_types) and os.path.exists(value):
185
        with open(value, "rb") as fp:
186
            contents = pickle.load(fp)
187
        return contents
188
    return value
189
190
191
def _pack_value(value, protocol=-1):
192
    """
193
    Pack contents to a temporary file.
194
195
    :param value:
196
        The contents to temporarily pickle.
197
198
    :param protocol: [optional]
199
        The pickling protocol to use.
200
201
    :returns:
202
        A temporary filename where the contents are stored.
203
    """
204
    
205
    _, temporary_filename = mkstemp()
206
    with open(temporary_filename, "wb") as fp:
207
        pickle.dump(value, fp, protocol)
208
    return temporary_filename
209