async_rx.multicast.rx_publish.rx_publish()   F
last analyzed

Complexity

Conditions 15

Size

Total Lines 106
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 50
dl 0
loc 106
rs 2.9998
c 0
b 0
f 0
cc 15
nop 4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like async_rx.multicast.rx_publish.rx_publish() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from typing import Optional
2
3
from ..observable import rx_create
4
from ..protocol import (
5
    ConnectableObservable,
6
    ConnectableObservableHandler,
7
    Observable,
8
    Observer,
9
    Subject,
10
    SubjectFactory,
11
    SubjectHandler,
12
    Subscription,
13
    connectable_observable,
14
)
15
from ..protocol import subject_handler as _subject_handler
16
from ..subject import rx_subject
17
18
__all__ = ["rx_publish"]
19
20
21
def rx_publish(
22
    an_observable: Observable,
23
    subject_handler: Optional[SubjectHandler] = None,
24
    connection_handler: Optional[ConnectableObservableHandler] = None,
25
    subject_factory: SubjectFactory = rx_subject,
26
) -> ConnectableObservable:
27
    """Create a Connectable Observable.
28
29
    A multicasted Observable (rx_publish) uses a Subject under the hood to make multiple
30
    Observers see the same Observable execution.
31
32
    Args:
33
        an_observable (Observable): observable to connect
34
        subject_handler (Optional[SubjectHandler]): optional subject handler
35
        connection_handler (Optional[ConnectableObservableHandler]): optional connection handler
36
        subject_factory (Optional[SubjectFactory]): subject factory, per default use subject
37
38
    Returns:
39
        (ConnectableObservable): the multicasted Observable instance
40
41
    """
42
    _ref_count_activated = False  # Flag to enable auto-connect
43
    _ref_count = 0  # subscription count (used for auto-connect)
44
    _subscription: Optional[Subscription] = None  # observale subscription
45
    _connectable_observable: Optional[ConnectableObservable] = None  # for ref_count return value
46
    _subject: Optional[Subject] = None
47
48
    async def _unsubscribe() -> None:
49
        nonlocal _subscription
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _subscription does not seem to be defined.
Loading history...
50
51
        if _subscription:
52
            await _subscription()
53
            # notify
54
            if connection_handler:
55
                await connection_handler.on_disconnect()
56
57
            _subscription = None
58
59
    async def _connect() -> Subscription:
60
        """Connection Handler implementation."""
61
        nonlocal _subscription, _subject
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _subscription does not seem to be defined.
Loading history...
62
63
        if _subscription:
64
            return _unsubscribe
65
66
        if not _subject:  # pragma: no cover
67
            # never reached
68
            raise RuntimeError("unexpected error")
69
70
        _subscription = await an_observable.subscribe(an_observer=_subject)
71
72
        if connection_handler:
73
            await connection_handler.on_connect()
74
75
        return _unsubscribe
76
77
    async def _on_subscribe(count: int, source: Observer) -> None:
78
        nonlocal _subscription, _ref_count_activated, _ref_count
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _ref_count does not seem to be defined.
Loading history...
79
80
        _ref_count += 1
81
82
        # forward event
83
        if subject_handler:
84
            await subject_handler.on_subscribe(count=count, source=source)
85
86
        # auto connect
87
        if _ref_count_activated and _subscription is None and _ref_count == 1:
88
            await _connect()
89
90
    async def _on_unsubscribe(count: int, source: Observer) -> None:
91
        nonlocal _subscription, _ref_count_activated, _ref_count
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _ref_count does not seem to be defined.
Loading history...
92
93
        _ref_count -= 1
94
95
        # forward event
96
        if subject_handler:
97
            await subject_handler.on_unsubscribe(count=count, source=source)
98
99
        # auto disconnect
100
        if _ref_count_activated and _subscription and _ref_count == 0:
101
            await _unsubscribe()
102
103
    # our multicast subject used under the hood
104
    _subject = subject_factory(subject_handler=_subject_handler(on_subscribe=_on_subscribe, on_unsubscribe=_on_unsubscribe))
105
106
    async def _ref_count_handler() -> Observable:
107
        """Autostart the multicasted observable.
108
109
        ref_count makes the multicasted Observable automatically start executing when
110
        the first subscriber arrives,
111
        and stop executing when the last subscriber leaves.
112
        """
113
        nonlocal _ref_count_activated, _connectable_observable
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _ref_count_activated does not seem to be defined.
Loading history...
114
115
        if not _connectable_observable:  # pragma: no cover
116
            # never reached
117
            raise RuntimeError("unexpected error")
118
119
        _ref_count_activated = True
120
121
        return rx_create(subscribe=_connectable_observable.subscribe)
122
123
    # our connectable observable
124
    _connectable_observable = connectable_observable(connect=_connect, ref_count=_ref_count_handler, subscribe=_subject.subscribe)
125
126
    return _connectable_observable
127