Completed
Pull Request — master (#2842)
by Edward
05:16
created

TaskIndicator.__exit__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import struct
18
import subprocess
19
import sys
20
21
from st2client.utils.color import format_status
22
23
__all__ = [
24
    'get_terminal_size'
25
]
26
27
28
def get_terminal_size(default=(80, 20)):
29
    """
30
    :return: (lines, cols)
31
    """
32
    def ioctl_GWINSZ(fd):
33
        import fcntl
34
        import termios
35
        return struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
36
    # try stdin, stdout, stderr
37
    for fd in (0, 1, 2):
38
        try:
39
            return ioctl_GWINSZ(fd)
40
        except:
41
            pass
42
    # try os.ctermid()
43
    try:
44
        fd = os.open(os.ctermid(), os.O_RDONLY)
45
        try:
46
            return ioctl_GWINSZ(fd)
47
        finally:
48
            os.close(fd)
49
    except:
50
        pass
51
    # try `stty size`
52
    try:
53
        process = subprocess.Popen(['stty', 'size'],
54
                                   shell=False,
55
                                   stdout=subprocess.PIPE,
56
                                   stderr=open(os.devnull, 'w'))
57
        result = process.communicate()
58
        if process.returncode == 0:
59
            return tuple(int(x) for x in result[0].split())
60
    except:
61
        pass
62
    # try environment variables
63
    try:
64
        return tuple(int(os.getenv(var)) for var in ('LINES', 'COLUMNS'))
65
    except:
66
        pass
67
    #  return default.
68
    return default
69
70
71
class TaskIndicator(object):
72
    def __enter__(self):
73
        return self
74
75
    def __exit__(self, type, value, traceback):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in type.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
76
        return self.close()
77
78
    def add_stage(self, status, name):
79
        self._write('\t[{:^20}] {}'.format(format_status(status), name))
80
81
    def update_stage(self, status, name):
82
        self._write('\t[{:^20}] {}'.format(format_status(status), name), override=True)
83
84
    def finish_stage(self, status, name):
85
        self._write('\t[{:^20}] {}'.format(format_status(status), name), override=True)
86
87
    def close(self):
88
        self._write('\n')
89
90
    def _write(self, string, override=False):
91
        if override:
92
            sys.stdout.write('\r')
93
        else:
94
            sys.stdout.write('\n')
95
96
        sys.stdout.write(string)
97
        sys.stdout.flush()
98