Passed
Push — dev ( 4bfa05...67478a )
by Konstantinos
01:31
created

test_notification   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 103
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 9
eloc 69
dl 0
loc 103
rs 10
c 0
b 0
f 0

6 Functions

Rating   Name   Duplication   Size   Complexity  
A test_scenario() 0 53 4
A observer_class() 0 4 1
A test_attrs_sanity1() 0 15 1
A subject() 0 4 1
A test_attrs_sanity() 0 10 1
A test_observers_sanity_test1() 0 4 1
1
import pytest
2
3
@pytest.fixture
4
def subject():
5
    from green_magic.utils import Subject
6
    return Subject
7
8
9
@pytest.fixture
10
def observer_class():
11
    from green_magic.utils import Observer
12
    return Observer
13
14
15
def test_attrs_sanity():
16
    import attr
17
18
    @attr.s
19
    class A:
20
        _observers = attr.ib(init=True, default=[])
21
22
    i1 = A()
23
    i2 = A()
24
    assert id(i1._observers) == id(i2._observers)  # SOS
25
26
27
def test_attrs_sanity1():
28
    import attr
29
30
    @attr.s
31
    class A:
32
        _observers = attr.ib(init=True)
33
        def add(self, *objects):
34
            self._observers.extend([_ for _ in objects])
35
    i1 = A([])
36
    i2 = A([])
37
    assert id(i1._observers) != id(i2._observers)
38
    i1.add('obs1')
39
    assert i1._observers == ['obs1']
40
    assert i2._observers == []
41
    assert id(i1._observers) != id(i2._observers)
42
43
44
def test_observers_sanity_test1(subject):
45
    subject1 = subject([])
46
    subject2 = subject([])
47
    assert id(subject1._observers) != id(subject2._observers)
48
49
50
def test_scenario(subject, observer_class):
51
# The client code.
52
53
    print("------ Scenario 1 ------\n")
54
    class ObserverA(observer_class):
55
        def update(self, a_subject) -> None:
56
            if a_subject.state == 0:
57
                print("ObserverA: Reacted to the event")
58
59
    s1 = subject([])
60
    o1 = observer_class()
61
    s1.attach(o1)
62
63
    # business logic
64
    s1.state = 0
65
    s1.notify()
66
67
    print("------ Scenario 2 ------\n")
68
    # example 2
69
    class Businessubject(subject):
70
71
        def some_business_logic(self) -> None:
72
            """
73
            Usually, the subscription logic is only a fraction of what a Subject can
74
            really do. Subjects commonly hold some important business logic, that
75
            triggers a notification method whenever something important is about to
76
            happen (or after it).
77
            """
78
            print("\nSubject: I'm doing something important.")
79
            from random import randrange
80
            self._state = randrange(0, 10)
81
            print(f"Subject: My state has just changed to: {self._state}")
82
            self.notify()
83
84
    class ObserverB(observer_class):
85
        def update(self, a_subject) -> None:
86
            if a_subject.state == 0 or a_subject.state >= 2:
87
                print("ObserverB: Reacted to the event")
88
89
    s2 = Businessubject([])
90
    assert id(s1) != id(s2)
91
    assert id(s1._observers) != id(s2._observers)
92
    o1, o2 = ObserverA(), ObserverB()
93
    # s2.attach(o1)
94
    # s2.attach(o2)
95
    s2.add(o1, o2)
96
    # business logic
97
    print(s2._observers)
98
    s2.some_business_logic()
99
    s2.some_business_logic()
100
101
    s2.detach(o1)
102
    s2.some_business_logic()
103