tests.unit.test_core.test_helpers   A
last analyzed

Complexity

Total Complexity 11

Size/Duplication

Total Lines 125
Duplicated Lines 0 %

Test Coverage

Coverage 94.52%

Importance

Changes 0
Metric Value
eloc 82
dl 0
loc 125
ccs 69
cts 73
cp 0.9452
rs 10
c 0
b 0
f 0
wmc 11

7 Methods

Rating   Name   Duplication   Size   Complexity  
A TestHelpers.test_listen_to_run_on_threadpool_by_default() 0 15 1
A TestHelpers.test_get_time__str() 0 10 1
A TestHelpers.test_run_on_thread() 0 12 1
A TestHelpers.test_get_time__dict() 0 15 1
A TestHelpers.test_get_time__none() 0 5 1
A TestHelpers.test_listen_to_dynamic_single_executors() 0 18 2
A TestHelpers.test_default_executors() 0 6 1

2 Functions

Rating   Name   Duplication   Size   Complexity  
A test_alisten_to() 0 15 1
A test_load_spec() 0 6 2
1
"""Test kytos.core.helpers module."""
2 1
from unittest.mock import MagicMock, patch
3
4 1
from kytos.core import helpers
5 1
from kytos.core.helpers import (alisten_to, ds_executors, executors,
6
                                get_thread_pool_max_workers, get_time,
7
                                listen_to, load_spec, run_on_thread)
8
9
10 1
async def test_alisten_to():
11
    """Test alisten_to decorator."""
12
13 1
    class SomeClass:
14
        """SomeClass."""
15
16 1
        @alisten_to("some_event")
17 1
        async def on_some_event(self, event):
18
            """On some event handler."""
19 1
            _ = event
20 1
            return "some_response"
21
22 1
    assert SomeClass.on_some_event.__name__ == "inner"
23 1
    result = await SomeClass().on_some_event(MagicMock())
24 1
    assert result == "some_response"
25
26
27 1
def test_load_spec(monkeypatch, minimal_openapi_spec_dict) -> None:
28
    """Test load spec."""
29 1
    monkeypatch.setattr(helpers, "_read_from_filename",
30
                        lambda x: minimal_openapi_spec_dict)
31 1
    spec = load_spec("mocked_path")
32 1
    assert spec.accessor.lookup == minimal_openapi_spec_dict
33
34
35 1
class TestHelpers:
36
    """Test the helpers methods."""
37
38 1
    @staticmethod
39 1
    @patch('kytos.core.helpers.Thread')
40 1
    def test_run_on_thread(mock_thread):
41
        """Test run_on_thread decorator."""
42
43 1
        @run_on_thread
44 1
        def test():
45
            pass
46
47 1
        test()
48
49 1
        mock_thread.return_value.start.assert_called()
50
51 1
    @staticmethod
52 1
    def test_default_executors():
53
        """Test default expected executors."""
54 1
        pools = ["api", "app", "db", "sb"]
55 1
        assert sorted(pools) == sorted(executors.keys())
56 1
        assert not ds_executors
57
58 1
    @staticmethod
59 1
    def test_listen_to_run_on_threadpool_by_default():
60
        """Test listen_to runs on ThreadPoolExecutor by default."""
61
62 1
        assert get_thread_pool_max_workers()
63 1
        assert "app" in executors
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable executors does not seem to be defined.
Loading history...
64 1
        executors["app"] = MagicMock()
65 1
        mock_executor = executors["app"]
66
67 1
        @listen_to("some_event")
68 1
        def test(event):
69
            _ = event
70
71 1
        assert test({}) is None
72 1
        mock_executor.submit.assert_called()
73
74 1
    @staticmethod
75 1
    def test_listen_to_dynamic_single_executors():
76
        """Test listen_to dynamic_single executor."""
77
78 1
        assert len(ds_executors) == 0
79
80 1
        @listen_to("some_event", pool="dynamic_single")
81 1
        def some_handler(event):
82
            _ = event
83
84 1
        @listen_to("some_event", pool="dynamic_single")
85 1
        def another_handler(event):
86
            _ = event
87
88 1
        for _ in range(2):
89 1
            some_handler(MagicMock())
90 1
            another_handler(MagicMock())
91 1
        assert len(ds_executors) == 2
92
93 1
    def test_get_time__str(self):
94
        """Test get_time method passing a string as parameter."""
95 1
        date = get_time("2000-01-01T00:30:00")
96
97 1
        assert date.year == 2000
98 1
        assert date.month == 1
99 1
        assert date.day == 1
100 1
        assert date.hour == 0
101 1
        assert date.minute == 30
102 1
        assert date.second == 0
103
104 1
    def test_get_time__dict(self):
105
        """Test get_time method passing a dict as parameter."""
106 1
        date = get_time({"year": 2000,
107
                         "month": 1,
108
                         "day": 1,
109
                         "hour": 00,
110
                         "minute": 30,
111
                         "second": 00})
112
113 1
        assert date.year == 2000
114 1
        assert date.month == 1
115 1
        assert date.day == 1
116 1
        assert date.hour == 0
117 1
        assert date.minute == 30
118 1
        assert date.second == 0
119
120 1
    def test_get_time__none(self):
121
        """Test get_time method by not passing a parameter."""
122 1
        date = get_time()
123
124
        assert date is None
125