Passed
Push — master ( 14e0e0...a18443 )
by Konstantinos
01:33
created

SubjectInterface.detach()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
"""Notification-Listener (aka subject-observer) software design pattern.
2
3
Simple implementation of the subject/observers (broadcast/listeners) pattern,
4
exposed as python classes.
5
6
This is a Behavioural Pattern which can be used when you want one or more
7
components to be notified and react accordingly, when 'something happens'.
8
9
One entity, known as Subject, is responsible to send out a notification and each
10
entity "subscribed" to the Subject receives it and reacts.
11
Each subscribed entity is known as a Listener or Observer.
12
13
The idea is that the Subject is agnostic of its Observers implementation and the
14
client code can "attach" or "detach" (subscribe/unsubscribe) as many of them at
15
runtime.
16
17
This module provides the Observer class, serving as the interface that needs to
18
be implemented by concrete classes; the update method needs to be overrode.
19
Concrete Observers react to the notifications/updates issued by the Subject they
20
had been attached/subscribed to.
21
22
This module also provides the concrete Subject class, serving with methods to
23
subscribe/unsubscribe (attach/detach) observers and also with a method to
24
"notify" all Observers.
25
"""
26
27
from abc import ABC, abstractmethod
28
from typing import Generic, List, TypeVar, Union
29
30
__all__ = ['Subject', 'Observer']
31
32
33
T = TypeVar('T')
34
StateVariableType = Union[T, None]
35
36
37
class ObserverInterface(ABC):
38
    """The Observer interface declares the update method, used by subjects.
39
40
    Enables objects to act as "event" listeners; react to "notifications"
41
    by executing specific (event) handling logic.
42
    """
43
44
    @abstractmethod
45
    def update(self, *args, **kwargs) -> None:
46
        """Receive an update (from a subject); handle an event notification."""
47
        raise NotImplementedError
48
49
50
class SubjectInterface(ABC):
51
    """The Subject interface declares a set of methods for managing subscribers.
52
53
    Enables objects to act as subjects that broadcast events" by notifying
54
    all subscribed observers/listeners.
55
    """
56
57
    @abstractmethod
58
    def attach(self, observer: ObserverInterface) -> None:
59
        """Attach an observer to the subject; subscribe the observer."""
60
        raise NotImplementedError
61
62
    @abstractmethod
63
    def detach(self, observer: ObserverInterface) -> None:
64
        """Detach an observer from the subject; unsubscribe the observer."""
65
        raise NotImplementedError
66
67
    @abstractmethod
68
    def notify(self) -> None:
69
        """Notify all observers about an event."""
70
        raise NotImplementedError
71
72
73
class Observer(ObserverInterface, ABC):
74
    pass
75
76
77
# Result object for add method
78
class AddObserversResult:
79
    """Result of adding multiple observers: contains added and failed lists."""
80
81
    def __init__(self, added: List[ObserverInterface], failed: List[ObserverInterface]):
82
        self.added = added
83
        self.failed = failed
84
85
86
class Subject(SubjectInterface, Generic[T]):
87
    import asyncio
88
    import inspect
89
90
    """Concrete Subject which owns an important state and notifies observers.
91
92
    The subject can be used to build the data encapsulating the event being
93
    broadcasted.
94
95
    Both the _state and _observers attributes have a simple implementation,
96
    but can be overrode to accommodate for more complex scenarios.
97
98
    The observers/subscribers are implemented as a python list.
99
    In more complex scenarios, the list of subscribers can
100
    be stored more comprehensively (categorized by event type, etc.).
101
102
    The subscription management methods provided are 'attach', 'detach' (as in
103
    the SubjectInterface) and 'add', which attached multiple observers at once.
104
105
    Example:
106
107
        >>> from software_patterns import Subject, Observer
108
109
        >>> broadcaster = Subject()
110
111
        >>> class ObserverTypeA(Observer):
112
        ...  def update(self, *args, **kwargs):
113
        ...   event = args[0].state
114
        ...   print(f'observer-type-a reacts to event {event}')
115
116
        >>> class ObserverTypeB(Observer):
117
        ...  def update(self, *args, **kwargs):
118
        ...   event = args[0].state
119
        ...   print(f'observer-type-b reacts to event {event}')
120
121
        >>> subscriber_1 = ObserverTypeA()
122
        >>> subscriber_2 = ObserverTypeB()
123
124
        >>> broadcaster.add(subscriber_2, subscriber_1)
125
126
        >>> broadcaster.state = 'event-object-A'
127
128
        >>> broadcaster.notify()
129
        observer-type-b reacts to event event-object-A
130
        observer-type-a reacts to event event-object-A
131
132
        >>> broadcaster.detach(subscriber_2)
133
134
        >>> broadcaster.state = 'event-object-B'
135
        >>> broadcaster.notify()
136
        observer-type-a reacts to event event-object-B
137
    """
138
139
    def __init__(self, *args, **kwargs):
140
        self._observers: List[ObserverInterface] = []
141
        self._state = StateVariableType
142
143
    def attach(self, observer: ObserverInterface) -> None:
144
        # Early fail if observer does not have an 'update' callable
145
        if not callable(getattr(observer, 'update', None)):
146
            raise TypeError(
147
                f"Attached observer {observer!r} does not have a callable 'update' method."
148
            )
149
        self._observers.append(observer)
150
151
    def detach(self, observer: ObserverInterface) -> None:
152
        self._observers.remove(observer)
153
154
    def notify(self) -> None:
155
        for observer in self._observers:
156
            observer.update(self)
157
158
    async def notify_async(self) -> None:
159
        """Notify all observers, awaiting async ones and calling sync ones."""
160
        tasks = []
161
        for observer in self._observers:
162
            update = getattr(observer, 'update', None)
163
            if update is None:
164
                continue
165
            if self.inspect.iscoroutinefunction(update):
166
                tasks.append(update(self))
167
            else:
168
                update(self)
169
        if tasks:
170
            await self.asyncio.gather(*tasks)
171
172
    def add(self, *observers):
173
        """Subscribe multiple observers at once. Returns AddObserversResult.
174
175
        In case some observers are incompatible (do not have 'update' method), they
176
        are ignored and returned as part of the 'failed' list in the result.
177
178
        Args:
179
            observers (ObserverInterface): variable number of observers to attach
180
181
        Returns:
182
            AddObserversResult: with 'added' and 'failed' lists of observers
183
        """
184
        # find compatible observers/listeners/subscribers, by checking the 'update' attribute
185
        compatible_listeners = [
186
            obs for obs in observers if callable(getattr(obs, 'update', None))
187
        ]
188
189
        # add compatible listenrs to list of subscribers
190
        self._observers.extend(compatible_listeners)
191
192
        return AddObserversResult(
193
            compatible_listeners, [x for x in observers if x not in compatible_listeners]
194
        )
195
196
    @property
197
    def state(self) -> StateVariableType:
198
        """Get the state of the Subject.
199
200
        Returns:
201
            StateType: the object representing the current state of the Subject
202
        """
203
        return self._state
204
205
    @state.setter
206
    def state(self, state: T):
207
        """Set the state of the Subject.
208
209
        Args:
210
            state (StateType): the state object
211
        """
212
        self._state = state
213