@@ 11-29 (lines=19) @@ | ||
8 | use Rx\Operator\OperatorInterface; |
|
9 | use Rx\SchedulerInterface; |
|
10 | ||
11 | final class JsonDecodeOperator implements OperatorInterface |
|
12 | { |
|
13 | /** |
|
14 | * @inheritDoc |
|
15 | */ |
|
16 | public function __invoke( |
|
17 | ObservableInterface $observable, |
|
18 | ObserverInterface $observer, |
|
19 | SchedulerInterface $scheduler = null |
|
20 | ) { |
|
21 | return $observable->subscribe(new CallbackObserver( |
|
22 | function (string $json) use ($observer) { |
|
23 | $observer->onNext(json_decode($json, true)); |
|
24 | }, |
|
25 | [$observer, 'onError'], |
|
26 | [$observer, 'onCompleted'] |
|
27 | )); |
|
28 | } |
|
29 | } |
|
30 |
@@ 11-29 (lines=19) @@ | ||
8 | use Rx\Operator\OperatorInterface; |
|
9 | use Rx\SchedulerInterface; |
|
10 | ||
11 | final class JsonEncodeOperator implements OperatorInterface |
|
12 | { |
|
13 | /** |
|
14 | * @inheritDoc |
|
15 | */ |
|
16 | public function __invoke( |
|
17 | ObservableInterface $observable, |
|
18 | ObserverInterface $observer, |
|
19 | SchedulerInterface $scheduler = null |
|
20 | ) { |
|
21 | return $observable->subscribe(new CallbackObserver( |
|
22 | function (array $json) use ($observer) { |
|
23 | $observer->onNext(json_encode($json)); |
|
24 | }, |
|
25 | [$observer, 'onError'], |
|
26 | [$observer, 'onCompleted'] |
|
27 | )); |
|
28 | } |
|
29 | } |
|
30 |