async_rx.subject.rx_subject_from   A
last analyzed

Complexity

Total Complexity 5

Size/Duplication

Total Lines 33
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 5
eloc 15
dl 0
loc 33
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_subject_from() 0 25 5
1
from typing import Optional
2
3
from ..protocol import CompleteHandler, ErrorHandler, NextHandler, Subject, Subscribe, subject
4
5
__all__ = ["rx_subject_from"]
6
7
8
def rx_subject_from(
9
    a_subject: Subject,
10
    subscribe: Optional[Subscribe] = None,
11
    on_next: Optional[NextHandler] = None,
12
    on_error: Optional[ErrorHandler] = None,
13
    on_completed: Optional[CompleteHandler] = None,
14
) -> Subject:
15
    """Build a subject from another one by override some function.
16
17
    Args:
18
        a_subject (Subject): the source subject
19
        subscribe (Optional[Subscribe]): override subscribe if set
20
        on_next (Optional[NextHandler]): override on_next if set
21
        on_error (Optional[ErrorHandler]): override on_error if set
22
        on_completed (Optional[CompleteHandler]): override on_completed if set
23
24
    Returns;
25
        (Subject): a new subject
26
27
    """
28
    return subject(
29
        subscribe=subscribe if subscribe else a_subject.subscribe,
30
        on_next=on_next if on_next else a_subject.on_next,
31
        on_error=on_error if on_error else a_subject.on_error,
32
        on_completed=on_completed if on_completed else a_subject.on_completed,
33
    )
34