1 | <?php declare(strict_types=1); |
||
15 | final class ObservableBunny |
||
16 | { |
||
17 | /** |
||
18 | * @var LoopInterface |
||
19 | */ |
||
20 | private $loop; |
||
21 | |||
22 | /** |
||
23 | * @var Client |
||
24 | */ |
||
25 | private $bunny; |
||
26 | |||
27 | /** |
||
28 | * @param LoopInterface $loop |
||
29 | * @param Client $bunny |
||
30 | */ |
||
31 | 4 | public function __construct(LoopInterface $loop, Client $bunny) |
|
36 | |||
37 | 4 | public function consume( |
|
38 | string $queue = '', |
||
39 | string $consumerTag = '', |
||
40 | bool $noLocal = false, |
||
41 | bool $noAck = false, |
||
42 | bool $exclusive = false, |
||
43 | bool $nowait = false, |
||
44 | array $arguments = [] |
||
45 | ): Subject { |
||
46 | 4 | $subject = new Subject(); |
|
47 | 4 | $consumeArgs = [$queue, $consumerTag, $noLocal, $noAck, $exclusive, $nowait, $arguments]; |
|
48 | |||
49 | 4 | $channel = $this->bunny->channel(); |
|
50 | 4 | $channel->then(function (Channel $channel) use ($subject, $consumeArgs) { |
|
|
|||
51 | /** @var string $consumerTag */ |
||
52 | 3 | $consumerTag = null; |
|
53 | 3 | $timer = $this->loop->addPeriodicTimer(1, function () use ($channel, $subject, &$timer, &$consumerTag) { |
|
54 | 1 | if (!$subject->isDisposed()) { |
|
55 | return; |
||
56 | } |
||
57 | |||
58 | 1 | $this->cancelSubscription( |
|
59 | 1 | $timer, |
|
60 | 1 | $channel, |
|
61 | 1 | $consumerTag |
|
62 | 1 | )->done([$subject, 'onComplete'], $this->onError($subject, $timer)); |
|
63 | 3 | }); |
|
64 | 3 | $channel->consume( |
|
65 | 3 | function (BunnyMessage $message, Channel $channel) use ($subject, &$timer, &$consumerTag) { |
|
66 | 2 | if ($subject->isDisposed()) { |
|
67 | 1 | $channel->nack($message); |
|
68 | 1 | $this->cancelSubscription( |
|
69 | 1 | $timer, |
|
70 | 1 | $channel, |
|
71 | 1 | $consumerTag |
|
72 | 1 | )->done([$subject, 'onComplete'], $this->onError($subject, $timer)); |
|
73 | |||
74 | 1 | return; |
|
75 | } |
||
76 | |||
77 | 1 | $subject->onNext(new Message($message, $channel)); |
|
78 | 3 | }, |
|
79 | 3 | ...$consumeArgs |
|
80 | 3 | )->then(function (MethodBasicConsumeOkFrame $response) use (&$consumerTag) { |
|
81 | 2 | $consumerTag = $response->consumerTag; |
|
82 | 3 | })->done(null, $this->onError($subject, $timer)); |
|
83 | 4 | })->done(null, [$subject, 'onError']); |
|
84 | |||
85 | 4 | return $subject; |
|
86 | } |
||
87 | |||
88 | 2 | private function cancelSubscription(TimerInterface $timer, Channel $channel, string $consumerTag): PromiseInterface |
|
96 | |||
97 | private function onError(Subject $subject, TimerInterface $timer): callable |
||
106 | } |
||
107 |
It seems like the method you are trying to call exists only in some of the possible types.
Let’s take a look at an example:
Available Fixes
Add an additional type-check:
Only allow a single type to be passed if the variable comes from a parameter: