Passed
Pull Request — master (#3)
by Guibert
53s
created

test_decorator   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 120
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 19
eloc 79
dl 0
loc 120
rs 10
c 0
b 0
f 0

16 Functions

Rating   Name   Duplication   Size   Complexity  
A test_inverter() 0 7 2
A a_func() 0 2 1
A test_decorate() 0 7 1
A test_retry_until_success() 0 3 1
A success_func() 0 2 1
A exception_func() 0 2 1
A test_retry_until_failed() 0 3 1
A empty_func() 0 2 1
A failure_func() 0 2 1
A test_always_failure() 0 6 1
A tick() 0 8 3
A test_retry() 0 12 1
A test_root_name() 0 4 1
A test_is_success() 0 6 1
A test_always_success() 0 5 1
A test_is_failure() 0 6 1
1
from contextvars import ContextVar
2
3
import pytest
4
from curio import run
5
6
from libellule.control_flow.common import FAILURE, SUCCESS, ControlFlowException
7
from libellule.control_flow.decorator import *
8
9
10
async def a_func():
11
    return "a"
12
13
14
async def failure_func():
15
    return FAILURE
16
17
18
async def success_func():
19
    return SUCCESS
20
21
22
async def exception_func():
23
    raise RuntimeError()
24
25
26
async def empty_func():
27
    return []
28
29
30
counter = ContextVar("counter", default=5)
31
32
33
async def tick():
34
    value = counter.get()
35
    counter.set(value - 1)
36
    if value <= 0:
37
        return SUCCESS
38
    if value == 3:
39
        raise RuntimeError("3")
40
    return FAILURE
41
42
43
def test_root_name():
44
    rooted = alias(a_func, name="a_func")
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable alias does not seem to be defined.
Loading history...
45
    assert rooted.__node_metadata.name == "a_func"
46
    assert run(rooted) == "a"
47
48
49
def test_decorate():
50
    async def b_decorator(child_value, other=""):
51
        return f"b{child_value}{other}"
52
53
    assert run(decorate(a_func, b_decorator)) == "ba"
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable decorate does not seem to be defined.
Loading history...
54
55
    assert run(decorate(a_func, b_decorator, other="c")) == "bac"
56
57
58
def test_always_success():
59
    assert run(always_success(success_func)) == SUCCESS
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable always_success does not seem to be defined.
Loading history...
60
    assert run(always_success(failure_func)) == SUCCESS
61
    assert run(always_success(exception_func)) == SUCCESS
62
    assert run(always_success(a_func)) == "a"
63
64
65
def test_always_failure():
66
    assert run(always_failure(success_func)) == FAILURE
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable always_failure does not seem to be defined.
Loading history...
67
    assert run(always_failure(failure_func)) == FAILURE
68
    assert not run(always_failure(exception_func))
69
    assert isinstance(run(always_failure(exception_func)), ControlFlowException)
70
    assert run(always_failure(empty_func)) == []
71
72
73
def test_is_success():
74
    assert run(is_success(success_func))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable is_success does not seem to be defined.
Loading history...
75
    assert not run(is_success(failure_func))
76
    assert not run(is_success(exception_func))
77
    assert run(is_success(a_func))
78
    assert not run(is_success(empty_func))
79
80
81
def test_is_failure():
82
    assert not run(is_failure(success_func))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable is_failure does not seem to be defined.
Loading history...
83
    assert run(is_failure(failure_func))
84
    assert run(is_failure(exception_func))
85
    assert not run(is_failure(a_func))
86
    assert run(is_failure(empty_func))
87
88
89
def test_inverter():
90
    assert not run(inverter(success_func))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable inverter does not seem to be defined.
Loading history...
91
    assert run(inverter(failure_func))
92
    with pytest.raises(RuntimeError):
93
        run(inverter(exception_func))
94
    assert not run(inverter(a_func))
95
    assert run(inverter(empty_func))
96
97
98
def test_retry():
99
100
    result = run(retry(tick))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable retry does not seem to be defined.
Loading history...
101
    assert not result
102
    assert isinstance(result, ControlFlowException)
103
    assert run(retry(tick))
104
105
    counter.set(10)
106
    assert run(retry(tick, max_retry=11))
107
108
    counter.set(100)
109
    assert run(retry(tick, max_retry=-1))
110
111
112
def test_retry_until_success():
113
    counter.set(100)
114
    assert run(retry_until_success(tick))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable retry_until_success does not seem to be defined.
Loading history...
115
116
117
def test_retry_until_failed():
118
    counter.set(100)
119
    assert run(retry_until_failed(tick))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable retry_until_failed does not seem to be defined.
Loading history...
120