Completed
Push — master ( d357d6...b56762 )
by Konstantinos
14s queued 13s
created

test_notification.subject()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 1
nop 0
1
import pytest
2
3
from software_patterns import Observer, Subject
4
5
6
def test_observers_sanity_test():
7
    subject1: Subject = Subject([])
8
    subject2: Subject = Subject([])
9
    assert hasattr(subject1, '_observers')
10
    assert hasattr(subject2, '_observers')
11
    assert id(subject1._observers) != id(subject2._observers)
12
13
14
# def test_observer_as_constructor(observer: t.Type[Observer]):
15
def test_observer_as_constructor():
16
    observer = Observer
17
18
    with pytest.raises(TypeError) as instantiation_from_interface_error:
19
        _observer_instance = observer()  # type: ignore[abstract]
20
21
    import re
22
23
    runtime_exception_message_reg = (
24
        "Can't instantiate abstract class " "Observer with abstract methods? update"
25
    )
26
27
    assert re.match(
28
        runtime_exception_message_reg, str(instantiation_from_interface_error.value)
29
    )
30
31
32
# def test_scenario(subject: t.Type[Subject], observer: t.Type[Observer]):
33
def test_scenario():
34
    # Scenario 1
35
    # The client code.
36
    class ObserverA(Observer):
37
        def update(self, *args, **kwargs) -> None:
38
            print("ObserverA: Reacted to the event")
39
40
    s1: Subject = Subject([])
41
    o1 = ObserverA()
42
    s1.attach(o1)
43
44
    # business logic
45
    s1.state = 0
46
    s1.notify()
47
48
    # Scenario 2
49
    class Businessubject(Subject):
50
        def some_business_logic(self) -> None:
51
            """
52
            Usually, the subscription logic is only a fraction of what a Subject can
53
            really do. Subjects commonly hold some important business logic, that
54
            triggers a notification method whenever something important is about to
55
            happen (or after it).
56
            """
57
            print("\nSubject: I'm doing something important.")
58
            self._state = 2
59
            print(f"Subject: My state has just changed to: {self._state}")
60
            self.notify()
61
62
    class ObserverB(Observer):
63
        def update(self, *args, **kwargs) -> None:
64
            subject = args[0]
65
            if subject.state == 0 or subject.state >= 2:
66
                print("ObserverB: Reacted to the event")
67
68
    s2 = Businessubject([])
69
    assert id(s1) != id(s2)
70
    assert id(s1._observers) != id(s2._observers)
71
    o1, o2 = ObserverA(), ObserverB()
72
73
    s2.add(o1, o2)
74
    # business logic
75
    print(s2._observers)
76
    s2.some_business_logic()
77
    s2.some_business_logic()
78
79
    s2.detach(o1)
80
    s2.some_business_logic()
81