Completed
Push — master ( e6dbce...addc19 )
by Roy
02:40
created

format_date()   F

Complexity

Conditions 26

Size

Total Lines 75

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 26
dl 0
loc 75
rs 2.4695
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like format_date() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-11-06 11:50:13
7
8
import logging
9
import hashlib
10
import datetime
11
import socket
12
import base64
13
14
import six
15
from six import iteritems
16
17
md5string = lambda x: hashlib.md5(utf8(x)).hexdigest()
18
19
20
class ReadOnlyDict(dict):
21
    """A Read Only Dict"""
22
23
    def __setitem__(self, key, value):
24
        raise Exception("dict is read-only")
25
26
27
def getitem(obj, key=0, default=None):
28
    """Get first element of list or return default"""
29
    try:
30
        return obj[key]
31
    except:
32
        return default
33
34
35
def hide_me(tb, g=globals()):
36
    """Hide stack traceback of given stack"""
37
    base_tb = tb
38
    try:
39
        while tb and tb.tb_frame.f_globals is not g:
40
            tb = tb.tb_next
41
        while tb and tb.tb_frame.f_globals is g:
42
            tb = tb.tb_next
43
    except Exception as e:
44
        logging.exception(e)
45
        tb = base_tb
46
    if not tb:
47
        tb = base_tb
48
    return tb
49
50
51
def run_in_thread(func, *args, **kwargs):
52
    """Run function in thread, return a Thread object"""
53
    from threading import Thread
54
    thread = Thread(target=func, args=args, kwargs=kwargs)
55
    thread.daemon = True
56
    thread.start()
57
    return thread
58
59
60
def run_in_subprocess(func, *args, **kwargs):
61
    """Run function in subprocess, return a Process object"""
62
    from multiprocessing import Process
63
    thread = Process(target=func, args=args, kwargs=kwargs)
64
    thread.daemon = True
65
    thread.start()
66
    return thread
67
68
69
def format_date(date, gmt_offset=0, relative=True, shorter=False, full_format=False):
70
    """Formats the given date (which should be GMT).
71
72
    By default, we return a relative time (e.g., "2 minutes ago"). You
73
    can return an absolute date string with ``relative=False``.
74
75
    You can force a full format date ("July 10, 1980") with
76
    ``full_format=True``.
77
78
    This method is primarily intended for dates in the past.
79
    For dates in the future, we fall back to full format.
80
81
    From tornado
82
    """
83
    if not date:
84
        return '-'
85
    if isinstance(date, float) or isinstance(date, int):
86
        date = datetime.datetime.utcfromtimestamp(date)
87
    now = datetime.datetime.utcnow()
88
    if date > now:
89
        if relative and (date - now).seconds < 60:
90
            # Due to click skew, things are some things slightly
91
            # in the future. Round timestamps in the immediate
92
            # future down to now in relative mode.
93
            date = now
94
        else:
95
            # Otherwise, future dates always use the full format.
96
            full_format = True
97
    local_date = date - datetime.timedelta(minutes=gmt_offset)
98
    local_now = now - datetime.timedelta(minutes=gmt_offset)
99
    local_yesterday = local_now - datetime.timedelta(hours=24)
100
    difference = now - date
101
    seconds = difference.seconds
102
    days = difference.days
103
104
    format = None
105
    if not full_format:
106
        if relative and days == 0:
107
            if seconds < 50:
108
                return ("1 second ago" if seconds <= 1 else
109
                        "%(seconds)d seconds ago") % {"seconds": seconds}
110
111
            if seconds < 50 * 60:
112
                minutes = round(seconds / 60.0)
113
                return ("1 minute ago" if minutes <= 1 else
114
                        "%(minutes)d minutes ago") % {"minutes": minutes}
115
116
            hours = round(seconds / (60.0 * 60))
117
            return ("1 hour ago" if hours <= 1 else
118
                    "%(hours)d hours ago") % {"hours": hours}
119
120
        if days == 0:
121
            format = "%(time)s"
122
        elif days == 1 and local_date.day == local_yesterday.day and \
123
                relative:
124
            format = "yesterday" if shorter else "yesterday at %(time)s"
125
        elif days < 5:
126
            format = "%(weekday)s" if shorter else "%(weekday)s at %(time)s"
127
        elif days < 334:  # 11mo, since confusing for same month last year
128
            format = "%(month)s-%(day)s" if shorter else \
129
                "%(month)s-%(day)s at %(time)s"
130
131
    if format is None:
132
        format = "%(month_name)s %(day)s, %(year)s" if shorter else \
133
            "%(month_name)s %(day)s, %(year)s at %(time)s"
134
135
    str_time = "%d:%02d" % (local_date.hour, local_date.minute)
136
137
    return format % {
138
        "month_name": local_date.strftime('%b'),
139
        "weekday": local_date.strftime('%A'),
140
        "day": str(local_date.day),
141
        "year": str(local_date.year),
142
        "month": local_date.month,
143
        "time": str_time
144
    }
145
146
147
class TimeoutError(Exception):
148
    pass
149
150
try:
151
    import signal
152
    if not hasattr(signal, 'SIGALRM'):
153
        raise ImportError('signal')
154
155
    class timeout:
156
        """
157
        Time limit of command
158
159
        with timeout(3):
160
            time.sleep(10)
161
        """
162
163
        def __init__(self, seconds=1, error_message='Timeout'):
164
            self.seconds = seconds
165
            self.error_message = error_message
166
167
        def handle_timeout(self, signum, frame):
168
            raise TimeoutError(self.error_message)
169
170
        def __enter__(self):
171
            if self.seconds:
172
                signal.signal(signal.SIGALRM, self.handle_timeout)
173
                signal.alarm(self.seconds)
174
175
        def __exit__(self, type, value, traceback):
176
            if self.seconds:
177
                signal.alarm(0)
178
except ImportError:
179
    class timeout:
180
        """
181
        Time limit of command (for windows)
182
        """
183
184
        def __init__(self, seconds=1, error_message='Timeout'):
185
            pass
186
187
        def __enter__(self):
188
            pass
189
190
        def __exit__(self, type, value, traceback):
191
            pass
192
193
194
def utf8(string):
195
    """
196
    Make sure string is utf8 encoded bytes.
197
198
    If parameter is a object, object.__str__ will been called before encode as bytes
199
    """
200
    if isinstance(string, six.text_type):
201
        return string.encode('utf8')
202
    elif isinstance(string, six.binary_type):
203
        return string
204
    else:
205
        return six.text_type(string).encode('utf8')
206
207
208
def text(string, encoding='utf8'):
209
    """
210
    Make sure string is unicode type, decode with given encoding if it's not.
211
212
    If parameter is a object, object.__str__ will been called
213
    """
214
    if isinstance(string, six.text_type):
215
        return string
216
    elif isinstance(string, six.binary_type):
217
        return string.decode(encoding)
218
    else:
219
        return six.text_type(string)
220
221
222
def pretty_unicode(string):
223
    """
224
    Make sure string is unicode, try to decode with utf8, or unicode escaped string if failed.
225
    """
226
    if isinstance(string, six.text_type):
227
        return string
228
    try:
229
        return string.decode("utf8")
230
    except UnicodeDecodeError:
231
        return string.decode('Latin-1').encode('unicode_escape').decode("utf8")
232
233
234
def unicode_string(string):
235
    """
236
    Make sure string is unicode, try to default with utf8, or base64 if failed.
237
238
    can been decode by `decode_unicode_string`
239
    """
240
    if isinstance(string, six.text_type):
241
        return string
242
    try:
243
        return string.decode("utf8")
244
    except UnicodeDecodeError:
245
        return '[BASE64-DATA]' + base64.b64encode(string) + '[/BASE64-DATA]'
246
247
248
def unicode_dict(_dict):
249
    """
250
    Make sure keys and values of dict is unicode.
251
    """
252
    r = {}
253
    for k, v in iteritems(_dict):
254
        r[unicode_obj(k)] = unicode_obj(v)
255
    return r
256
257
258
def unicode_list(_list):
259
    """
260
    Make sure every element in list is unicode. bytes will encode in base64
261
    """
262
    return [unicode_obj(x) for x in _list]
263
264
265
def unicode_obj(obj):
266
    """
267
    Make sure keys and values of dict/list/tuple is unicode. bytes will encode in base64.
268
269
    Can been decode by `decode_unicode_obj`
270
    """
271
    if isinstance(obj, dict):
272
        return unicode_dict(obj)
273
    elif isinstance(obj, (list, tuple)):
274
        return unicode_list(obj)
275
    elif isinstance(obj, six.string_types):
276
        return unicode_string(obj)
277
    elif isinstance(obj, (int, float)):
278
        return obj
279
    elif obj is None:
280
        return obj
281
    else:
282
        try:
283
            return text(obj)
284
        except:
285
            return text(repr(obj))
286
287
288
def decode_unicode_string(string):
289
    """
290
    Decode string encoded by `unicode_string`
291
    """
292
    if string.startswith('[BASE64-DATA]') and string.endswith('[/BASE64-DATA]'):
293
        return base64.b64decode(string[len('[BASE64-DATA]'):-len('[/BASE64-DATA]')])
294
    return string
295
296
297
def decode_unicode_obj(obj):
298
    """
299
    Decode unicoded dict/list/tuple encoded by `unicode_obj`
300
    """
301
    if isinstance(obj, dict):
302
        r = {}
303
        for k, v in iteritems(obj):
304
            r[decode_unicode_string(k)] = decode_unicode_obj(v)
305
        return r
306
    elif isinstance(obj, six.string_types):
307
        return decode_unicode_string(obj)
308
    elif isinstance(obj, (list, tuple)):
309
        return [decode_unicode_obj(x) for x in obj]
310
    else:
311
        return obj
312
313
314
class Get(object):
315
    """
316
    Lazy value calculate for object
317
    """
318
319
    def __init__(self, getter):
320
        self.getter = getter
321
322
    def __get__(self, instance, owner):
323
        return self.getter()
324
325
326
class ObjectDict(dict):
327
    """
328
    Object like dict, every dict[key] can visite by dict.key
329
330
    If dict[key] is `Get`, calculate it's value.
331
    """
332
333
    def __getattr__(self, name):
334
        ret = self.__getitem__(name)
335
        if hasattr(ret, '__get__'):
336
            return ret.__get__(self, ObjectDict)
337
        return ret
338
339
340
def load_object(name):
341
    """Load object from module"""
342
343
    if "." not in name:
344
        raise Exception('load object need module.object')
345
346
    module_name, object_name = name.rsplit('.', 1)
347
    if six.PY2:
348
        module = __import__(module_name, globals(), locals(), [utf8(object_name)], -1)
349
    else:
350
        module = __import__(module_name, globals(), locals(), [object_name])
351
    return getattr(module, object_name)
352
353
354
def get_python_console(namespace=None):
355
    """
356
    Return a interactive python console instance with caller's stack
357
    """
358
359
    if namespace is None:
360
        import inspect
361
        frame = inspect.currentframe()
362
        caller = frame.f_back
363
        if not caller:
364
            logging.error("can't find caller who start this console.")
365
            caller = frame
366
        namespace = dict(caller.f_globals)
367
        namespace.update(caller.f_locals)
368
369
    try:
370
        from IPython.terminal.interactiveshell import TerminalInteractiveShell
371
        shell = TerminalInteractiveShell(user_ns=namespace)
372
    except ImportError:
373
        try:
374
            import readline
375
            import rlcompleter
376
            readline.set_completer(rlcompleter.Completer(namespace).complete)
377
            readline.parse_and_bind("tab: complete")
378
        except ImportError:
379
            pass
380
        import code
381
        shell = code.InteractiveConsole(namespace)
382
        shell._quit = False
383
384
        def exit():
385
            shell._quit = True
386
387
        def readfunc(prompt=""):
388
            if shell._quit:
389
                raise EOFError
390
            return six.moves.input(prompt)
391
392
        # inject exit method
393
        shell.ask_exit = exit
394
        shell.raw_input = readfunc
395
396
    return shell
397
398
399
def python_console(namespace=None):
400
    """Start a interactive python console with caller's stack"""
401
402
    if namespace is None:
403
        import inspect
404
        frame = inspect.currentframe()
405
        caller = frame.f_back
406
        if not caller:
407
            logging.error("can't find caller who start this console.")
408
            caller = frame
409
        namespace = dict(caller.f_globals)
410
        namespace.update(caller.f_locals)
411
412
    return get_python_console(namespace=namespace).interact()
413
414
415
def check_port_open(port, addr='127.0.0.1'):
416
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
417
    result = sock.connect_ex((addr, port))
418
    if result == 0:
419
        return True
420
    else:
421
        return False
422