test_realsocket_makefile()   D
last analyzed

Complexity

Conditions 9

Size

Total Lines 51

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 9
dl 0
loc 51
rs 4.2844

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
from __future__ import print_function
2
3
import os
4
import re
5
import socket
6
import warnings
7
8
import pytest
9
from process_tests import dump_on_error
10
from process_tests import wait_for_strings
11
12
import aspectlib
13
from aspectlib.test import mock
14
from aspectlib.test import record
15
from aspectlib.utils import PYPY
16
17
try:
18
    import thread
19
except ImportError:
20
    import _thread as thread
21
22
try:
23
    from StringIO import StringIO
24
except ImportError:
25
    from io import StringIO
26
27
LOG_TEST_SOCKET = r"""^\{_?socket(object)?\}.connect\(\('127.0.0.1', 1\)\) +<<< .*tests[\/]test_integrations.py:\d+:test_socket.*
28
\{_?socket(object)?\}.connect \~ raised .*(ConnectionRefusedError|error)\((10061|111), .*refused.*\)"""
29
30
31
def test_mock_builtin():
32
    with aspectlib.weave(open, mock('foobar')):
33
        assert open('???') == 'foobar'
34
35
    assert open(__file__) != 'foobar'
36
37
38
def test_mock_builtin_os():
39
    print(os.open.__name__)
40
    with aspectlib.weave('os.open', mock('foobar')):
41
        assert os.open('???') == 'foobar'
42
43
    assert os.open(__file__, 0) != 'foobar'
44
45
46
def test_record_warning():
47
    with aspectlib.weave('warnings.warn', record):
48
        warnings.warn('crap')
49
        assert warnings.warn.calls == [(None, ('crap',), {})]
50
51
52
@pytest.mark.skipif(not hasattr(os, 'fork'), reason="os.fork not available")
53
def test_fork():
54
    with aspectlib.weave('os.fork', mock('foobar')):
55
        pid = os.fork()
56
        if not pid:
57
            os._exit(0)
58
        assert pid == 'foobar'
59
60
    pid = os.fork()
61
    if not pid:
62
        os._exit(0)
63
    assert pid != 'foobar'
64
65
def test_socket(target=socket.socket):
66
    buf = StringIO()
67
    with aspectlib.weave(target, aspectlib.debug.log(
68
        print_to=buf,
69
        stacktrace=4,
70
        module=False
71
    ), lazy=True):
72
        s = socket.socket()
73
        try:
74
            s.connect(('127.0.0.1', 1))
75
        except Exception:
76
            pass
77
78
    print(buf.getvalue())
79
    assert re.match(LOG_TEST_SOCKET, buf.getvalue())
80
81
    s = socket.socket()
82
    try:
83
        s.connect(('127.0.0.1', 1))
84
    except Exception:
85
        pass
86
87
    assert re.match(LOG_TEST_SOCKET, buf.getvalue())
88
89
90
def test_socket_as_string_target():
91
    test_socket(target='socket.socket')
92
93
94
def test_socket_meth(meth=socket.socket.close):
95
    calls = []
96
    with aspectlib.weave(meth, record(calls=calls)):
97
        s = socket.socket()
98
        assert s.close() is None
99
    assert calls == [(s, (), {})]
100
    del calls[:]
101
102
    s = socket.socket()
103
    assert s.close() is None
104
    assert calls == []
105
106
107
def test_socket_meth_as_string_target():
108
    test_socket_meth('socket.socket.close')
109
110
111
def test_socket_all_methods():
112
    buf = StringIO()
113
    with aspectlib.weave(
114
        socket.socket,
115
        aspectlib.debug.log(print_to=buf, stacktrace=False),
116
        lazy=True,
117
        methods=aspectlib.ALL_METHODS
118
    ):
119
        s = socket.socket()
120
121
    assert "}.__init__ => None" in buf.getvalue()
122
123
124
@pytest.mark.skipif(not hasattr(os, 'fork') or PYPY, reason="os.fork not available or PYPY")
125
def test_realsocket_makefile():
126
    buf = StringIO()
127
    p = socket.socket()
128
    p.bind(('127.0.0.1', 0))
129
    p.listen(1)
130
    p.settimeout(1)
131
    pid = os.fork()
132
133
    if pid:
134
        with aspectlib.weave(
135
            ['socket._fileobject' if aspectlib.PY2 else 'socket.SocketIO'] +
136
            (['socket.socket', 'socket._realsocket'] if aspectlib.PY2 else ['socket.socket']),
137
            aspectlib.debug.log(print_to=buf, stacktrace=False),
138
            lazy=True,
139
            methods=aspectlib.ALL_METHODS,
140
        ):
141
            s = socket.socket()
142
            s.settimeout(1)
143
            s.connect(p.getsockname())
144
            if aspectlib.PY3:
145
                fh = s.makefile('rwb', buffering=0)
146
            else:
147
                fh = s.makefile(bufsize=0)
148
            fh.write(b"STUFF\n")
149
            fh.readline()
150
151
        with dump_on_error(buf.getvalue):
152
            wait_for_strings(
153
                buf.getvalue, 0,
154
                "}.connect",
155
                "}.makefile",
156
                "}.write(",
157
                "}.send",
158
                "}.write =>",
159
                "}.readline()",
160
                "}.recv",
161
                "}.readline => ",
162
            )
163
    else:
164
        try:
165
            c, _ = p.accept()
166
            c.settimeout(1)
167
            if aspectlib.PY3:
168
                f = c.makefile('rw', buffering=1)
169
            else:
170
                f = c.makefile(bufsize=1)
171
            while f.readline():
172
                f.write('-\n')
173
        finally:
174
            os._exit(0)
175
176
177
def test_weave_os_module():
178
    calls = []
179
180
    with aspectlib.weave('os', record(calls=calls, extended=True), methods="getenv|walk"):
181
        os.getenv('BUBU', 'bubu')
182
        os.walk('.')
183
184
    assert calls == [
185
        (None, 'os.getenv', ('BUBU', 'bubu'), {}),
186
        (None, 'os.walk', ('.',), {})
187
    ]
188