async_rx.protocol.subject   A
last analyzed

Complexity

Total Complexity 2

Size/Duplication

Total Lines 48
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 2
eloc 12
dl 0
loc 48
rs 10
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
A subject_handler() 0 11 1
A subject() 0 19 1
1
"""Subject utilities."""
2
from collections import namedtuple
3
4
from .definition import CompleteHandler, ErrorHandler, NextHandler, Subject, SubjectEventHandler, SubjectHandler, Subscribe
5
from .observer import default_error, default_on_completed
6
7
__all__ = ["subject_handler", "subject"]
8
9
SubjectDefinition = namedtuple("SubjectDefinition", ["subscribe", "on_next", "on_error", "on_completed"])
10
"""Implements Subject Protocol."""
11
12
SubjectHandlerDefinition = namedtuple("SubjectHandlerDefinition", ["on_subscribe", "on_unsubscribe"])
13
"""Implements SubjectHandler Protocol."""
14
15
16
def subject(
17
    subscribe: Subscribe, on_next: NextHandler, on_error: ErrorHandler = default_error, on_completed: CompleteHandler = default_on_completed
18
) -> Subject:
19
    """Build a subject.
20
21
    The underlying implementation use an named tuple.
22
23
    Args:
24
        subscribe (Subscribe): subscription handler.
25
        on_next (NextHandler): on_next handler which process items
26
        on_error (ErrorHandler): on_error handler (default with default_error
27
            which raise Exception)
28
        on_completed (CompleteHandler): on_completed handler (default with noop)
29
30
    Returns:
31
        (Subject): a subject
32
33
    """
34
    return SubjectDefinition(subscribe=subscribe, on_next=on_next, on_error=on_error, on_completed=on_completed)
35
36
37
def subject_handler(on_subscribe: SubjectEventHandler, on_unsubscribe: SubjectEventHandler) -> SubjectHandler:
38
    """Create a SubjectHandler.
39
40
    Args:
41
        on_subscribe (SubjectEventHandler): on subscribe event handler
42
        on_unsubscribe (SubjectEventHandler): on unsubscribe event handler
43
    Returns:
44
        (SubjectHandler): a SubjectHandler instance.
45
46
    """
47
    return SubjectHandlerDefinition(on_subscribe=on_subscribe, on_unsubscribe=on_unsubscribe)
48