tests.conftest   B
last analyzed

Complexity

Total Complexity 44

Size/Duplication

Total Lines 328
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 192
dl 0
loc 328
rs 8.8798
c 0
b 0
f 0
wmc 44

12 Functions

Rating   Name   Duplication   Size   Complexity  
A run_cli() 0 26 4
A watcher() 0 16 1
A call_event_from_name() 0 11 3
A pytest_addoption() 0 12 1
A clean_test_database() 0 13 2
A pytest_configure() 0 29 1
A pytest_unconfigure() 0 5 1
A call_event() 0 4 1
A wait_for() 0 11 5
A reset_base() 0 16 3
A clean_test_components() 0 8 2
A manager() 0 25 2

6 Methods

Rating   Name   Duplication   Size   Complexity  
A WaitEvent.wait() 0 9 3
A Watcher._on_event() 0 4 2
A Watcher.clear() 0 4 1
A WaitEvent.__init__() 0 18 2
C Watcher.wait() 0 12 9
A Watcher.__init__() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like tests.conftest often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
# Isomer - The distributed application framework
5
# ==============================================
6
# Copyright (C) 2011-2020 Heiko 'riot' Weinen <[email protected]> and others.
7
#
8
# This program is free software: you can redistribute it and/or modify
9
# it under the terms of the GNU Affero General Public License as published by
10
# the Free Software Foundation, either version 3 of the License, or
11
# (at your option) any later version.
12
#
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
# GNU Affero General Public License for more details.
17
#
18
# You should have received a copy of the GNU Affero General Public License
19
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21
import os
22
import sys
23
import shutil
24
import threading
25
import collections
26
from time import sleep, strftime
27
from collections import deque
28
29
import pytest
30
import pymongo
31
from click.testing import CliRunner
32
from circuits.core.manager import TIMEOUT
33
from circuits import handler, BaseComponent, Debugger, Manager
34
from formal import model_factory
35
from isomer.database import initialize
36
from isomer.component import ConfigurableComponent
37
from isomer.misc.path import set_etc_path, set_instance
38
from isomer.schemata.component import ComponentConfigSchemaTemplate
39
from isomer.tool.etc import create_configuration
40
41
"""Basic Test suite bits and pieces"""
42
43
DEFAULT_DATABASE_NAME = "isomer-test-internal"
44
DEFAULT_DATABASE_HOST = "localhost"
45
DEFAULT_DATABASE_PORT = 27017
46
DEFAULT_INSTANCE_NAME = "isomer-test-internal"
47
COLORS = False
48
49
50
class TestComponent(ConfigurableComponent):
51
    """Very basic testing component"""
52
53
    configprops = {
54
        'test': {'type': 'string'}
55
    }
56
57
58
class Watcher(BaseComponent):
59
    """Watches for incoming events"""
60
61
    def __init__(self, *args, **kwargs):
62
        super(Watcher).__init__(*args, **kwargs)
63
        self.events = deque()
64
        self._lock = threading.Lock()
65
66
    @handler(channel="*", priority=999.9)
67
    def _on_event(self, event, *args, **kwargs):
68
        with self._lock:
69
            self.events.append(event)
70
71
    def clear(self):
72
        """Reset caught events"""
73
74
        self.events.clear()
75
76
    def wait(self, name, channel=None, timeout=6.0):
77
        """Linger and wait for specified incoming events"""
78
79
        for i in range(int(timeout / TIMEOUT)):
80
            with self._lock:
81
                for event in self.events:
82
                    if event.name == name and event.waitingHandlers == 0:
83
                        if (channel is None) or (channel in event.channels):
84
                            return True
85
            sleep(TIMEOUT)
86
        else:
87
            return False
88
89
90
class Flag(object):
91
    """Flag object for Watcher component"""
92
    status = False
93
    event = None
94
95
96
def call_event_from_name(manager, event, event_name, *channels):
97
    """Fire a named event and wait for a specified response"""
98
99
    fired = False
100
    value = None
101
    for r in manager.waitEvent(event_name):
102
        if not fired:
103
            fired = True
104
            value = manager.fire(event, *channels)
105
        sleep(0.1)
106
    return value
107
108
109
def call_event(manager, event, *channels):
110
    """Simply fire and forget a specified event"""
111
112
    return call_event_from_name(manager, event, event.name, *channels)
113
114
115
class WaitEvent(object):
116
    """Simple component substitute that waits for a specified Event"""
117
118
    def __init__(self, manager, name, channel=None, timeout=2):
119
        if channel is None:
120
            channel = getattr(manager, "channel", None)
121
122
        self.timeout = timeout
123
        self.manager = manager
124
125
        flag = Flag()
126
127
        @handler(name, channel=channel)
128
        def on_event(self, event):
129
            """An event has been received"""
130
131
            flag.status = True
132
            flag.event = event
133
134
        self.handler = self.manager.addHandler(on_event)
135
        self.flag = flag
136
137
    def wait(self):
138
        """Wait for the (upon instantiation) specified timeout for an event"""
139
        try:
140
            for i in range(int(self.timeout / TIMEOUT)):
141
                if self.flag.status:
142
                    return self.flag.event
143
                sleep(TIMEOUT)
144
        finally:
145
            self.manager.removeHandler(self.handler)
146
147
148
def wait_for(obj, attr, value=True, timeout=3.0):
149
    """Wait until timeout or an object acquires a specified attribute"""
150
151
    from circuits.core.manager import TIMEOUT
152
    for i in range(int(timeout / TIMEOUT)):
153
        if isinstance(value, collections.Callable):
154
            if value(obj, attr):
155
                return True
156
        elif getattr(obj, attr) == value:
157
            return True
158
        sleep(TIMEOUT)
159
160
161
@pytest.fixture
162
def manager(request):
163
    """Component testing manager/fixture"""
164
165
    manager = Manager()
166
167
    def finalizer():
168
        """Stop the testing"""
169
170
        manager.stop()
171
172
    request.addfinalizer(finalizer)
173
174
    waiter = WaitEvent(manager, "started")
175
    manager.start()
176
    assert waiter.wait()
177
178
    if request.config.option.verbose:
179
        verbose = True
180
    else:
181
        verbose = False
182
183
    Debugger(events=verbose).register(manager)
184
185
    return manager
186
187
188
@pytest.fixture
189
def watcher(request, manager):
190
    """Fixture that cleans up after unregistering"""
191
192
    watcher = Watcher().register(manager)
193
194
    def finalizer():
195
        """Setup the manager and wait for completion, then unregister"""
196
197
        waiter = WaitEvent(manager, "unregistered")
198
        watcher.unregister()
199
        waiter.wait()
200
201
    request.addfinalizer(finalizer)
202
203
    return watcher
204
205
206
def run_cli(cmd, args, full_log=False):
207
    """Runs a command"""
208
209
    if COLORS is False:
210
        args.insert(0, '-nc')
211
212
    if full_log:
213
        timestamp = strftime("%Y%m%d-%H%M%S")
214
215
        log_args = [
216
            '--clog', '5', '--flog', '5',
217
            '--log-file', '/tmp/isomer-test_%s' % timestamp
218
        ]
219
        args = log_args + args
220
221
    args = ['--config-path', '/tmp/isomer-test/etc/isomer'] + args
222
    args = ['--prefix-path', '/tmp/isomer-test'] + args
223
    args = ['--instance', pytest.INSTANCENAME] + args
224
225
    # pprint(args)
226
227
    runner = CliRunner()
228
    result = runner.invoke(cmd, args, catch_exceptions=True, obj={})
229
    with open('/tmp/isomer_test_run_cli_logfile', 'a') as f:
230
        f.write(result.output)
231
    return result
232
233
234
def reset_base(unset_instance=False):
235
    """Prepares a testing folder and sets Isomer's base to that"""
236
237
    class ctx_mock():
238
        obj = {'config': None}
239
240
    if os.path.exists('/tmp/isomer-test'):
241
        shutil.rmtree('/tmp/isomer-test')
242
243
    os.makedirs('/tmp/isomer-test/etc/isomer/instances')
244
    os.makedirs('/tmp/isomer-test/var/log/isomer')
245
246
    set_etc_path('/tmp/isomer-test/etc/isomer')
247
    create_configuration(ctx_mock())
248
    if unset_instance is False:
249
        set_instance('foobar', 'green', '/tmp/isomer-test/')
250
251
252
def clean_test_components():
253
    """Removes test-generated component data"""
254
255
    print("Removing test components...")
256
    for item in model_factory(ComponentConfigSchemaTemplate).find({
257
        'componentclass': 'TestComponent'
258
    }):
259
        item.delete()
260
261
262
def clean_test_database(config):
263
    """Removes all of the test-generated database content"""
264
265
    db_name = config.getoption("--dbname", default=DEFAULT_DATABASE_NAME)
266
    host = config.getoption("--dbhost", default=DEFAULT_DATABASE_HOST)
267
    port = config.getoption("--dbport", default=DEFAULT_DATABASE_PORT)
268
269
    client = pymongo.MongoClient(host=host, port=int(port))
270
    if db_name in client.list_database_names():
271
        print("Dropping test database", db_name)
272
        client.drop_database(db_name)
273
    else:
274
        print("Test database does not exist")
275
276
277
@pytest.hookimpl()
278
def pytest_unconfigure(config):
279
    """Clear test generated data after test completion"""
280
281
    clean_test_database(config)
282
283
284
def pytest_addoption(parser):
285
    parser.addoption(
286
        "--dbname", action="store", default=DEFAULT_DATABASE_NAME,
287
        help="test db name"
288
    )
289
    parser.addoption(
290
        "--dbhost", action="store", default=DEFAULT_DATABASE_HOST,
291
        help="test db hostname"
292
    )
293
    parser.addoption(
294
        "--dbport", action="store", default=DEFAULT_DATABASE_PORT,
295
        help="test db port", type="int"
296
    )
297
298
299
def pytest_configure(config):
300
    """Setup the testing namespace"""
301
302
    dbname = config.getoption("--dbname", default=DEFAULT_DATABASE_NAME)
303
    dbhost = config.getoption("--dbhost", default=DEFAULT_DATABASE_HOST)
304
    dbport = config.getoption("--dbport", default=DEFAULT_DATABASE_PORT)
305
    instance_name = config.getoption("--instance", default=DEFAULT_INSTANCE_NAME)
306
307
    pytest.TestComponent = TestComponent
308
    pytest.clean_test_components = clean_test_components
309
    pytest.WaitEvent = WaitEvent
310
    pytest.wait_for = wait_for
311
    pytest.call_event = call_event
312
    pytest.PLATFORM = sys.platform
313
    pytest.PYVER = sys.version_info[:3]
314
    pytest.DBNAME = dbname
315
    pytest.DBHOST = dbhost
316
    pytest.DBPORT = dbport
317
    pytest.INSTANCENAME = instance_name
318
    pytest.call_event_from_name = call_event_from_name
319
    pytest.run_cli = run_cli
320
    pytest.reset_base = reset_base
321
322
    clean_test_database(config)
323
324
    initialize(
325
        address="%s:%i" % (dbhost, dbport),
326
        database_name=dbname,
327
        instance_name=instance_name
328
    )
329