async_rx.protocol.definition   A
last analyzed

Complexity

Total Complexity 29

Size/Duplication

Total Lines 343
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 29
eloc 111
dl 0
loc 343
rs 10
c 0
b 0
f 0

29 Methods

Rating   Name   Duplication   Size   Complexity  
A Collector.has_error() 0 3 1
A ObservableFactory.__call__() 0 8 1
A RefCountHandler.__call__() 0 2 1
A ConnectableObservable.ref_count() 0 8 1
A Observer.on_error() 0 2 1
A Collector.is_finish() 0 3 1
A Observable.subscribe() 0 2 1
A _SyncAccumulatorOperator.__call__() 0 2 1
A SubjectEventHandler.__call__() 0 2 1
A _AsyncAccumulatorOperator.__call__() 0 2 1
A ConnectableObservableHandler.on_connect() 0 3 1
A _SyncPredicateOperator.__call__() 0 2 1
A ConnectableObservableEventHandler.__call__() 0 2 1
A Collector.error() 0 3 1
A Subscription.__call__() 0 3 1
A _AsyncPredicateOperator.__call__() 0 2 1
A ConnectHandler.__call__() 0 2 1
A Subscribe.__call__() 0 11 1
A ConnectableObservable.connect() 0 3 1
A CompleteHandler.__call__() 0 3 1
A SubjectFactory.__call__() 0 2 1
A Observer.on_completed() 0 3 1
A ConnectableObservableHandler.on_disconnect() 0 3 1
A ErrorHandler.__call__() 0 11 1
A SubjectHandler.on_unsubscribe() 0 9 1
A Observer.on_next() 0 3 1
A Collector.result() 0 3 1
A NextHandler.__call__() 0 3 1
A SubjectHandler.on_subscribe() 0 9 1
1
"""Protocol definition."""
2
import sys
3
from typing import Any, NoReturn, Optional, TypeVar, Union
4
5
# Protocol is only available in Python 3.8+.
6
if sys.version_info.minor > 7:  # pragma: no cover
7
    from typing import Protocol
8
else:  # pragma: no cover
9
    from typing_extensions import Protocol  # type: ignore
10
11
__all__ = [
12
    "Subscription",
13
    "NextHandler",
14
    "CompleteHandler",
15
    "ErrorHandler",
16
    "Observable",
17
    "Observer",
18
    "Collector",
19
    "Subscribe",
20
    "Subject",
21
    "ConnectHandler",
22
    "RefCountHandler",
23
    "ConnectableObservable",
24
    "ObservableFactory",
25
    "SubjectEventHandler",
26
    "SubjectHandler",
27
    "ConnectableObservableEventHandler",
28
    "ConnectableObservableHandler",
29
    "PredicateOperator",
30
    "AccumulatorOperator",
31
    "SubjectFactory",
32
]
33
34
T = TypeVar('T')
35
36
37
class Subscription(Protocol):
38
    """Subscription Protocol.
39
40
    Subscription is a function to release resources or cancel Observable executions (act as a Disposable).
41
    It define something to be used and thrown away after you call it.
42
    """
43
44
    async def __call__(self) -> None:  # pragma: no cover
45
        """Release subscription."""
46
        pass
47
48
49
class NextHandler(Protocol):
50
    """NextHandler Protocol.
51
52
    A next handler process an item from associated observable.
53
    """
54
55
    async def __call__(self, item: Any) -> None:  # pragma: no cover
56
        """Process item."""
57
        pass
58
59
60
class CompleteHandler(Protocol):
61
    """CompleteHandler Protocol.
62
63
    A complete handler is call when no more item will came from the associated observable.
64
    """
65
66
    async def __call__(self) -> None:  # pragma: no cover
67
        """Signal completion of this observable."""
68
        pass
69
70
71
class ErrorHandler(Protocol):
72
    """ErrorHandler Protocol.
73
74
    An error handler receive a message or an exception and raise it.
75
    """
76
77
    async def __call__(self, err: Any) -> Optional[NoReturn]:  # pragma: no cover
78
        """Raise error.
79
80
        Args:
81
            err (Union[Any, Exception]): the error to raise
82
83
        Raises:
84
            (Exception): the exception
85
86
        """
87
        pass
88
89
90
class Observer(Protocol):
91
    """Observer Protocol.
92
93
    What is an Observer?
94
95
    An Observer is a consumer of values delivered by an Observable.
96
97
    Observers are simply a set of callbacks, one for each type of notification
98
    delivered by the Observable:
99
100
    - next,
101
    - error,
102
    - and complete.
103
104
    Observers are just "objects" with three callbacks, one for each type of
105
    notification that an Observable may deliver.
106
    """
107
108
    async def on_next(self, item: Any) -> None:  # pragma: no cover
109
        """Process item."""
110
        pass
111
112
    async def on_completed(self) -> None:  # pragma: no cover
113
        """Signal completion of this observable."""
114
        pass
115
116
    async def on_error(self, err: Any) -> Optional[NoReturn]:  # pragma: no cover
117
        pass
118
119
120
class Collector(Observer, Protocol):
121
    """Collector Observer Protocol."""
122
123
    def result(self) -> Any:  # pragma: no cover
124
        """Returns result."""
125
        pass
126
127
    def is_finish(self) -> bool:  # pragma: no cover
128
        """Return true if observable has completed."""
129
        pass
130
131
    def has_error(self) -> bool:  # pragma: no cover
132
        """Return true if observable has meet error."""
133
        pass
134
135
    def error(self) -> Any:  # pragma: no cover
136
        """Return error if observable has meet error."""
137
        pass
138
139
140
class Subscribe(Protocol):
141
    """Subscribe Protocol.
142
143
    It's a (sync/async) function wich take an observer and return a subscription.
144
    """
145
146
    async def __call__(self, an_observer: Observer) -> Subscription:  # pragma: no cover
147
        """Implement observer subscription.
148
149
        Args:
150
            observer (Observer): the observer instance
151
152
        Returns:
153
            (Subscription): subscription
154
155
        """
156
        pass
157
158
159
class Observable(Protocol):
160
    """Observable Protocol.
161
162
    An observable is something on which we can subscribe to listen event.
163
    """
164
165
    async def subscribe(self, an_observer: Observer) -> Subscription:  # pragma: no cover
166
        pass
167
168
169
class ObservableFactory(Protocol):
170
    """Async ObservableFactory Protocol.
171
172
    Define function which create Observable.
173
    """
174
175
    async def __call__(self) -> Observable:  # pragma: no cover
176
        """Create an Observable.
177
178
        Returns:
179
            (Observable): the new observable instance.
180
181
        """
182
        pass
183
184
185
class Subject(Observable, Observer, Protocol):
186
    """A Subject is like an Observable, but can multicast to many Observers.
187
188
    Subjects are like EventEmitters: they maintain a registry of many listeners.
189
    """
190
191
    pass
192
193
194
class SubjectEventHandler(Protocol):
195
    """Subject Event Handler Procotol."""
196
197
    async def __call__(self, count: int, source: Observer) -> None:  # pragma: no cover
198
        pass
199
200
201
class SubjectHandler(Protocol):
202
    """Subscribe Handler Protocol.
203
204
    This handler could be called on subscription/unsubscribe event.
205
    """
206
207
    async def on_subscribe(self, count: int, source: Observer) -> None:  # pragma: no cover
208
        """Notify on subscribe event.
209
210
        Args:
211
            count (int): current #subscribers after subscription
212
            source (Observer): observer source
213
214
        """
215
        pass
216
217
    async def on_unsubscribe(self, count: int, source: Observer) -> None:  # pragma: no cover
218
        """Notify on unsubscribe event.
219
220
        Args:
221
            count (int): current #subscribers after unsubscribe
222
            source (Observer): observer source
223
224
        """
225
        pass
226
227
228
class SubjectFactory(Protocol):
229
    def __call__(self, subject_handler: Optional[SubjectHandler] = None) -> Subject:  # pragma: no cover
230
        pass
231
232
233
class ConnectHandler(Protocol):
234
    """Connect Handler Protocol."""
235
236
    async def __call__(self) -> Subscription:  # pragma: no cover
237
        pass
238
239
240
class RefCountHandler(Protocol):
241
    """RefCount Handler Protocol."""
242
243
    async def __call__(self) -> Observable:  # pragma: no cover
244
        pass
245
246
247
class ConnectableObservable(Observable, Protocol):
248
    """Define a connectable observable protocol.
249
250
    We have :
251
        - subscribe function (it's an observable)
252
        - connect function: start executing
253
        - ref_count function: makes the Observable automatically start executing
254
            when the first subscriber arrives,
255
            and stop executing when the last subscriber leaves.
256
    """
257
258
    async def connect(self) -> Subscription:  # pragma: no cover
259
        """Connect."""
260
        pass
261
262
    async def ref_count(self) -> Observable:  # pragma: no cover
263
        """Reference counter.
264
265
        Make the multicasted Observable automatically start executing when
266
        the first subscriber arrives,
267
        and stop executing when the last subscriber leaves.
268
        """
269
        pass
270
271
272
class ConnectableObservableEventHandler(Protocol):
273
    """Connectable Observable Event Handler Protocol."""
274
275
    async def __call__(self) -> None:  # pragma: no cover
276
        pass
277
278
279
class ConnectableObservableHandler(Protocol):
280
    """Connectable Observable Handler Protocol.
281
282
    This handler could be called on conect/disconnect event.
283
    """
284
285
    async def on_connect(self) -> None:  # pragma: no cover
286
        """Called on connect event."""
287
        pass
288
289
    async def on_disconnect(self) -> None:  # pragma: no cover
290
        """Called on disconnect event."""
291
        pass
292
293
294
class _AsyncAccumulatorOperator(Protocol[T]):
295
    """Async Accumulator Operator Protocol.
296
297
    Accumulator are used in reduce operation.
298
    """
299
300
    async def __call__(self, buffer: T, item: T) -> T:  # pragma: no cover
301
        pass
302
303
304
class _SyncAccumulatorOperator(Protocol[T]):
305
    """Async Accumulator Operator Protocol.
306
307
    Accumulator are used in reduce operation.
308
    """
309
310
    def __call__(self, buffer: T, item: T) -> T:  # pragma: no cover
311
        pass
312
313
314
AccumulatorOperator = Union[_AsyncAccumulatorOperator, _SyncAccumulatorOperator]
315
"""Accumulator Operator Protocol.
316
317
Accumulator are used in reduce operation.
318
"""
319
320
321
class _AsyncPredicateOperator(Protocol):
322
    """Async Predicate Operator Protocol.
323
324
    Predicate are used in filter operation.
325
    """
326
327
    async def __call__(self, item: Any) -> bool:  # pragma: no cover
328
        pass
329
330
331
class _SyncPredicateOperator(Protocol):
332
    """Sync Predicate Operator Protocol.
333
334
    Predicate are used in filter operation.
335
    """
336
337
    def __call__(self, item: Any) -> bool:  # pragma: no cover
338
        pass
339
340
341
PredicateOperator = Union[_AsyncPredicateOperator, _SyncPredicateOperator]
342
"""Predicate Operator Protocol.
343
344
Predicate are used in filter operation.
345
"""
346