|
1
|
|
|
import typing as t |
|
2
|
|
|
|
|
3
|
|
|
import pytest |
|
4
|
|
|
|
|
5
|
|
|
from software_patterns import Observer, Subject |
|
6
|
|
|
|
|
7
|
|
|
# class SubjectLike(t.Protocol): |
|
8
|
|
|
# state: t.Any |
|
9
|
|
|
|
|
10
|
|
|
# class ObserverLike(t.Protocol): |
|
11
|
|
|
# update: t.Callable[[SubjectLike, t.Any], None] |
|
12
|
|
|
|
|
13
|
|
|
|
|
14
|
|
|
# @pytest.fixture |
|
15
|
|
|
# def subject(): |
|
16
|
|
|
# from software_patterns import Subject |
|
17
|
|
|
|
|
18
|
|
|
# return Subject |
|
19
|
|
|
|
|
20
|
|
|
|
|
21
|
|
|
# @pytest.fixture |
|
22
|
|
|
# def observer() -> t.Type[Observer]: |
|
23
|
|
|
# """Observer Abstract Class that acts as an Observer interface. |
|
24
|
|
|
|
|
25
|
|
|
# The Client code is expected to implement the 'update' method of the |
|
26
|
|
|
# interface. |
|
27
|
|
|
# """ |
|
28
|
|
|
# from software_patterns import Observer |
|
29
|
|
|
|
|
30
|
|
|
# return Observer |
|
31
|
|
|
|
|
32
|
|
|
|
|
33
|
|
|
def test_observers_sanity_test(): |
|
34
|
|
|
subject1: Subject = Subject([]) |
|
35
|
|
|
subject2: Subject = Subject([]) |
|
36
|
|
|
assert hasattr(subject1, '_observers') |
|
37
|
|
|
assert hasattr(subject2, '_observers') |
|
38
|
|
|
assert id(subject1._observers) != id(subject2._observers) |
|
39
|
|
|
|
|
40
|
|
|
|
|
41
|
|
|
# def test_observer_as_constructor(observer: t.Type[Observer]): |
|
42
|
|
|
def test_observer_as_constructor(): |
|
43
|
|
|
observer = Observer |
|
44
|
|
|
|
|
45
|
|
|
with pytest.raises(TypeError) as instantiation_from_interface_error: |
|
46
|
|
|
_observer_instance = observer() # type: ignore[abstract] |
|
47
|
|
|
|
|
48
|
|
|
import re |
|
49
|
|
|
|
|
50
|
|
|
runtime_exception_message_reg = ( |
|
51
|
|
|
"Can't instantiate abstract class " "Observer with abstract methods? update" |
|
52
|
|
|
) |
|
53
|
|
|
|
|
54
|
|
|
assert re.match( |
|
55
|
|
|
runtime_exception_message_reg, str(instantiation_from_interface_error.value) |
|
56
|
|
|
) |
|
57
|
|
|
|
|
58
|
|
|
|
|
59
|
|
|
# def test_scenario(subject: t.Type[Subject], observer: t.Type[Observer]): |
|
60
|
|
|
def test_scenario(): |
|
61
|
|
|
# Scenario 1 |
|
62
|
|
|
# The client code. |
|
63
|
|
|
class ObserverA(Observer): |
|
64
|
|
|
def update(self, *args, **kwargs) -> None: |
|
65
|
|
|
print("ObserverA: Reacted to the event") |
|
66
|
|
|
|
|
67
|
|
|
s1: Subject = Subject([]) |
|
68
|
|
|
o1 = ObserverA() |
|
69
|
|
|
s1.attach(o1) |
|
70
|
|
|
|
|
71
|
|
|
# business logic |
|
72
|
|
|
s1.state = 0 |
|
73
|
|
|
s1.notify() |
|
74
|
|
|
|
|
75
|
|
|
# Scenario 2 |
|
76
|
|
|
class Businessubject(Subject): |
|
77
|
|
|
def some_business_logic(self) -> None: |
|
78
|
|
|
""" |
|
79
|
|
|
Usually, the subscription logic is only a fraction of what a Subject can |
|
80
|
|
|
really do. Subjects commonly hold some important business logic, that |
|
81
|
|
|
triggers a notification method whenever something important is about to |
|
82
|
|
|
happen (or after it). |
|
83
|
|
|
""" |
|
84
|
|
|
print("\nSubject: I'm doing something important.") |
|
85
|
|
|
self._state = 2 |
|
86
|
|
|
print(f"Subject: My state has just changed to: {self._state}") |
|
87
|
|
|
self.notify() |
|
88
|
|
|
|
|
89
|
|
|
class ObserverB(Observer): |
|
90
|
|
|
def update(self, *args, **kwargs) -> None: |
|
91
|
|
|
subject = args[0] |
|
92
|
|
|
if subject.state == 0 or subject.state >= 2: |
|
93
|
|
|
print("ObserverB: Reacted to the event") |
|
94
|
|
|
|
|
95
|
|
|
s2 = Businessubject([]) |
|
96
|
|
|
assert id(s1) != id(s2) |
|
97
|
|
|
assert id(s1._observers) != id(s2._observers) |
|
98
|
|
|
o1, o2 = ObserverA(), ObserverB() |
|
99
|
|
|
|
|
100
|
|
|
s2.add(o1, o2) |
|
101
|
|
|
# business logic |
|
102
|
|
|
print(s2._observers) |
|
103
|
|
|
s2.some_business_logic() |
|
104
|
|
|
s2.some_business_logic() |
|
105
|
|
|
|
|
106
|
|
|
s2.detach(o1) |
|
107
|
|
|
s2.some_business_logic() |
|
108
|
|
|
|