| @@ 10-27 (lines=18) @@ | ||
| 7 | use Rx\ObserverInterface; |
|
| 8 | use Rx\Operator\OperatorInterface; |
|
| 9 | ||
| 10 | final class JsonDecodeOperator implements OperatorInterface |
|
| 11 | { |
|
| 12 | /** |
|
| 13 | * @inheritDoc |
|
| 14 | */ |
|
| 15 | public function __invoke( |
|
| 16 | ObservableInterface $observable, |
|
| 17 | ObserverInterface $observer |
|
| 18 | ): DisposableInterface { |
|
| 19 | return $observable->subscribe( |
|
| 20 | function (string $json) use ($observer): void { |
|
| 21 | $observer->onNext(\json_decode($json, true)); |
|
| 22 | }, |
|
| 23 | [$observer, 'onError'], |
|
| 24 | [$observer, 'onCompleted'] |
|
| 25 | ); |
|
| 26 | } |
|
| 27 | } |
|
| 28 | ||
| @@ 10-27 (lines=18) @@ | ||
| 7 | use Rx\ObserverInterface; |
|
| 8 | use Rx\Operator\OperatorInterface; |
|
| 9 | ||
| 10 | final class JsonEncodeOperator implements OperatorInterface |
|
| 11 | { |
|
| 12 | /** |
|
| 13 | * @inheritDoc |
|
| 14 | */ |
|
| 15 | public function __invoke( |
|
| 16 | ObservableInterface $observable, |
|
| 17 | ObserverInterface $observer |
|
| 18 | ): DisposableInterface { |
|
| 19 | return $observable->subscribe( |
|
| 20 | function (array $json) use ($observer): void { |
|
| 21 | $observer->onNext(\json_encode($json)); |
|
| 22 | }, |
|
| 23 | [$observer, 'onError'], |
|
| 24 | [$observer, 'onCompleted'] |
|
| 25 | ); |
|
| 26 | } |
|
| 27 | } |
|
| 28 | ||