This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | /** |
||
3 | * Client.php |
||
4 | * |
||
5 | * @copyright More in license.md |
||
6 | * @license http://www.ipublikuj.eu |
||
7 | * @author Adam Kadlec http://www.ipublikuj.eu |
||
8 | * @package iPublikuj:MQTTClient! |
||
9 | * @subpackage Client |
||
10 | * @since 1.0.0 |
||
11 | * |
||
12 | * @date 12.03.17 |
||
13 | */ |
||
14 | |||
15 | declare(strict_types = 1); |
||
16 | |||
17 | namespace IPub\MQTTClient\Client; |
||
18 | |||
19 | use Closure; |
||
20 | use Exception; |
||
21 | |||
22 | use Nette; |
||
23 | |||
24 | use React\EventLoop; |
||
25 | use React\Promise; |
||
26 | use React\Socket; |
||
27 | use React\Stream; |
||
28 | |||
29 | use BinSoul\Net\Mqtt; |
||
30 | |||
31 | use IPub\MQTTClient\Configuration; |
||
32 | use IPub\MQTTClient\Exceptions; |
||
33 | use IPub\MQTTClient\Flow; |
||
34 | |||
35 | /** |
||
36 | * Connection client |
||
37 | * |
||
38 | * @package iPublikuj:MQTTClient! |
||
39 | * @subpackage Client |
||
40 | * |
||
41 | * @author Adam Kadlec <[email protected]> |
||
42 | * |
||
43 | * @method onStart() |
||
44 | * @method onOpen(Mqtt\Connection $connection, IClient $client) |
||
45 | * @method onConnect(Mqtt\Connection $connection, IClient $client) |
||
46 | * @method onDisconnect(Mqtt\Connection $connection, IClient $client) |
||
47 | * @method onClose(Mqtt\Connection $connection, IClient $client) |
||
48 | * @method onPing(IClient $client) |
||
49 | * @method onPong(IClient $client) |
||
50 | * @method onPublish(Mqtt\Message $message, IClient $client) |
||
51 | * @method onSubscribe(Mqtt\Subscription $subscription, IClient $client) |
||
52 | * @method onUnsubscribe(Mqtt\Subscription $subscription, IClient $client) |
||
53 | * @method onMessage(Mqtt\Message $message, IClient $client) |
||
54 | * @method onWarning(Exception $ex, IClient $client) |
||
55 | * @method onError(Exception $ex, IClient $client) |
||
56 | */ |
||
57 | 1 | final class Client implements IClient |
|
58 | { |
||
59 | /** |
||
60 | * Implement nette smart magic |
||
61 | */ |
||
62 | 1 | use Nette\SmartObject; |
|
63 | |||
64 | /** |
||
65 | * @var Closure[] |
||
66 | */ |
||
67 | public $onStart = []; |
||
68 | |||
69 | /** |
||
70 | * @var Closure[] |
||
71 | */ |
||
72 | public $onOpen = []; |
||
73 | |||
74 | /** |
||
75 | * @var Closure[] |
||
76 | */ |
||
77 | public $onConnect = []; |
||
78 | |||
79 | /** |
||
80 | * @var Closure[] |
||
81 | */ |
||
82 | public $onDisconnect = []; |
||
83 | |||
84 | /** |
||
85 | * @var Closure[] |
||
86 | */ |
||
87 | public $onClose = []; |
||
88 | |||
89 | /** |
||
90 | * @var Closure[] |
||
91 | */ |
||
92 | public $onPing = []; |
||
93 | |||
94 | /** |
||
95 | * @var Closure[] |
||
96 | */ |
||
97 | public $onPong = []; |
||
98 | |||
99 | /** |
||
100 | * @var Closure[] |
||
101 | */ |
||
102 | public $onPublish = []; |
||
103 | |||
104 | /** |
||
105 | * @var Closure[] |
||
106 | */ |
||
107 | public $onSubscribe = []; |
||
108 | |||
109 | /** |
||
110 | * @var Closure[] |
||
111 | */ |
||
112 | public $onUnsubscribe = []; |
||
113 | |||
114 | /** |
||
115 | * @var Closure[] |
||
116 | */ |
||
117 | public $onMessage = []; |
||
118 | |||
119 | /** |
||
120 | * @var Closure[] |
||
121 | */ |
||
122 | public $onWarning = []; |
||
123 | |||
124 | /** |
||
125 | * @var Closure[] |
||
126 | */ |
||
127 | public $onError = []; |
||
128 | |||
129 | /** |
||
130 | * @var EventLoop\LoopInterface |
||
131 | */ |
||
132 | private $loop; |
||
133 | |||
134 | /** |
||
135 | * @var Configuration\Broker |
||
136 | */ |
||
137 | private $configuration; |
||
138 | |||
139 | /** |
||
140 | * @var Socket\ConnectorInterface |
||
141 | */ |
||
142 | private $connector; |
||
143 | |||
144 | /** |
||
145 | * @var Socket\ConnectionInterface|NULL |
||
146 | */ |
||
147 | private $stream = NULL; |
||
148 | |||
149 | /** |
||
150 | * @var Mqtt\Connection |
||
151 | */ |
||
152 | private $connection; |
||
153 | |||
154 | /** |
||
155 | * @var Mqtt\StreamParser |
||
156 | */ |
||
157 | private $parser; |
||
158 | |||
159 | /** |
||
160 | * @var Mqtt\IdentifierGenerator |
||
161 | */ |
||
162 | private $identifierGenerator; |
||
163 | |||
164 | /** |
||
165 | * @var bool |
||
166 | */ |
||
167 | private $isConnected = FALSE; |
||
168 | |||
169 | /** |
||
170 | * @var bool |
||
171 | */ |
||
172 | private $isConnecting = FALSE; |
||
173 | |||
174 | /** |
||
175 | * @var bool |
||
176 | */ |
||
177 | private $isDisconnecting = FALSE; |
||
178 | |||
179 | /** |
||
180 | * @var string |
||
181 | */ |
||
182 | private $host; |
||
183 | |||
184 | /** |
||
185 | * @var int |
||
186 | */ |
||
187 | private $port; |
||
188 | |||
189 | /** |
||
190 | * @var int |
||
191 | */ |
||
192 | private $timeout = 5; |
||
193 | |||
194 | /** |
||
195 | * @var Flow\Envelope[] |
||
196 | */ |
||
197 | private $receivingFlows = []; |
||
198 | |||
199 | /** |
||
200 | * @var Flow\Envelope[] |
||
201 | */ |
||
202 | private $sendingFlows = []; |
||
203 | |||
204 | /** |
||
205 | * @var Flow\Envelope |
||
206 | */ |
||
207 | private $writtenFlow; |
||
208 | |||
209 | /** |
||
210 | * @var EventLoop\TimerInterface[] |
||
211 | */ |
||
212 | private $timer = []; |
||
213 | |||
214 | /** |
||
215 | * @var Mqtt\Message|NULL |
||
216 | */ |
||
217 | private $lastMessage = NULL; |
||
218 | |||
219 | /** |
||
220 | * @param EventLoop\LoopInterface $eventLoop |
||
221 | * @param Configuration\Broker $configuration |
||
222 | * @param Mqtt\IdentifierGenerator|NULL $identifierGenerator |
||
223 | * @param Mqtt\StreamParser|NULL $parser |
||
224 | */ |
||
225 | public function __construct( |
||
226 | EventLoop\LoopInterface $eventLoop, |
||
227 | Configuration\Broker $configuration, |
||
228 | Mqtt\IdentifierGenerator $identifierGenerator = NULL, |
||
229 | Mqtt\StreamParser $parser = NULL |
||
230 | ) { |
||
231 | 1 | $this->loop = $eventLoop; |
|
232 | 1 | $this->configuration = $configuration; |
|
233 | |||
234 | 1 | $this->parser = $parser; |
|
235 | |||
236 | 1 | if ($this->parser === NULL) { |
|
237 | 1 | $this->parser = new Mqtt\StreamParser; |
|
238 | } |
||
239 | |||
240 | 1 | $this->parser->onError(function (Exception $ex) { |
|
241 | $this->onError($ex, $this); |
||
242 | 1 | }); |
|
243 | |||
244 | 1 | $this->identifierGenerator = $identifierGenerator; |
|
245 | |||
246 | 1 | if ($this->identifierGenerator === NULL) { |
|
247 | 1 | $this->identifierGenerator = new Mqtt\DefaultIdentifierGenerator; |
|
248 | } |
||
249 | 1 | } |
|
250 | |||
251 | /** |
||
252 | * {@inheritdoc} |
||
253 | */ |
||
254 | public function setLoop(EventLoop\LoopInterface $loop) : void |
||
255 | { |
||
256 | if (!$this->isConnected && !$this->isConnecting) { |
||
257 | $this->loop = $loop; |
||
258 | |||
259 | } else { |
||
260 | throw new Exceptions\LogicException('Connection is already established. React event loop could not be changed.'); |
||
261 | } |
||
262 | } |
||
263 | |||
264 | /** |
||
265 | * {@inheritdoc} |
||
266 | */ |
||
267 | public function getLoop() : EventLoop\LoopInterface |
||
268 | { |
||
269 | return $this->loop; |
||
270 | } |
||
271 | |||
272 | /** |
||
273 | * {@inheritdoc} |
||
274 | * |
||
275 | * @throws Exceptions\InvalidStateException |
||
276 | */ |
||
277 | public function setConfiguration(Configuration\Broker $configuration) : void |
||
278 | { |
||
279 | if ($this->isConnected() || $this->isConnecting) { |
||
280 | throw new Exceptions\InvalidStateException('Client is connecting or connected to the broker, therefore configuration could not be changed.'); |
||
281 | } |
||
282 | |||
283 | $this->configuration = $configuration; |
||
284 | } |
||
285 | |||
286 | /** |
||
287 | * {@inheritdoc} |
||
288 | * |
||
289 | * @throws Exceptions\InvalidStateException |
||
290 | */ |
||
291 | public function getUri() : string |
||
292 | { |
||
293 | return $this->configuration->getUri(); |
||
294 | } |
||
295 | |||
296 | /** |
||
297 | * {@inheritdoc} |
||
298 | */ |
||
299 | public function getPort() : int |
||
300 | { |
||
301 | return $this->configuration->getPort(); |
||
302 | } |
||
303 | |||
304 | /** |
||
305 | * {@inheritdoc} |
||
306 | */ |
||
307 | public function isConnected() : bool |
||
308 | { |
||
309 | return $this->isConnected; |
||
310 | } |
||
311 | |||
312 | /** |
||
313 | * {@inheritdoc} |
||
314 | * |
||
315 | * @throws Exceptions\InvalidStateException |
||
316 | */ |
||
317 | public function connect() : Promise\ExtendedPromiseInterface |
||
318 | { |
||
319 | if ($this->isConnected || $this->isConnecting) { |
||
320 | return new Promise\RejectedPromise(new Exceptions\LogicException('The client is already connected.')); |
||
321 | } |
||
322 | |||
323 | $this->createConnector(); |
||
324 | |||
325 | $this->onStart(); |
||
326 | |||
327 | $this->isConnecting = TRUE; |
||
328 | $this->isConnected = FALSE; |
||
329 | |||
330 | $connection = $this->configuration->getConnection(); |
||
331 | |||
332 | if ($connection->getClientID() === '') { |
||
333 | $connection->setClientID($this->identifierGenerator->generateClientID()); |
||
334 | } |
||
335 | |||
336 | $deferred = new Promise\Deferred; |
||
337 | |||
338 | $this->establishConnection() |
||
339 | ->then(function (Socket\ConnectionInterface $stream) use ($connection, $deferred) { |
||
340 | $this->stream = $stream; |
||
341 | |||
342 | $this->onOpen($connection, $this); |
||
343 | |||
344 | $this->registerClient($connection) |
||
345 | ->then(function (Mqtt\Connection $connection) use ($deferred) { |
||
346 | $this->isConnecting = FALSE; |
||
347 | $this->isConnected = TRUE; |
||
348 | |||
349 | $this->connection = $connection; |
||
350 | |||
351 | $this->onConnect($connection, $this); |
||
352 | |||
353 | $deferred->resolve($this->connection); |
||
354 | }) |
||
355 | ->otherwise(function (Exception $ex) use ($stream, $deferred, $connection) { |
||
356 | $this->isConnecting = FALSE; |
||
357 | |||
358 | $this->onError($ex, $this); |
||
359 | |||
360 | $deferred->reject($ex); |
||
361 | |||
362 | if ($this->stream !== NULL) { |
||
363 | $this->stream->close(); |
||
364 | } |
||
365 | |||
366 | $this->onClose($connection, $this); |
||
367 | }); |
||
368 | }) |
||
369 | ->otherwise(function (Exception $ex) use ($deferred) { |
||
370 | $this->isConnecting = FALSE; |
||
371 | |||
372 | $this->onError($ex, $this); |
||
373 | |||
374 | $deferred->reject($ex); |
||
375 | }); |
||
376 | |||
377 | return $deferred->promise(); |
||
378 | } |
||
379 | |||
380 | /** |
||
381 | * {@inheritdoc} |
||
382 | */ |
||
383 | public function disconnect() : Promise\ExtendedPromiseInterface |
||
384 | { |
||
385 | if (!$this->isConnected || $this->isDisconnecting) { |
||
386 | return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.')); |
||
387 | } |
||
388 | |||
389 | $this->isDisconnecting = TRUE; |
||
390 | |||
391 | $deferred = new Promise\Deferred; |
||
392 | |||
393 | $this->startFlow(new Mqtt\Flow\OutgoingDisconnectFlow($this->connection), TRUE) |
||
394 | ->then(function (Mqtt\Connection $connection) use ($deferred) { |
||
395 | $this->isDisconnecting = FALSE; |
||
396 | $this->isConnected = FALSE; |
||
397 | |||
398 | $this->onDisconnect($connection, $this); |
||
399 | |||
400 | $deferred->resolve($connection); |
||
401 | |||
402 | if ($this->stream !== NULL) { |
||
403 | $this->stream->close(); |
||
404 | } |
||
405 | }) |
||
406 | ->otherwise(function () use ($deferred) { |
||
407 | $this->isDisconnecting = FALSE; |
||
408 | $deferred->reject($this->connection); |
||
409 | }); |
||
410 | |||
411 | return $deferred->promise(); |
||
412 | } |
||
413 | |||
414 | /** |
||
415 | * {@inheritdoc} |
||
416 | */ |
||
417 | View Code Duplication | public function subscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface |
|
0 ignored issues
–
show
|
|||
418 | { |
||
419 | if (!$this->isConnected) { |
||
420 | return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.')); |
||
421 | } |
||
422 | |||
423 | return $this->startFlow(new Mqtt\Flow\OutgoingSubscribeFlow([$subscription], $this->identifierGenerator)); |
||
424 | } |
||
425 | |||
426 | /** |
||
427 | * {@inheritdoc} |
||
428 | */ |
||
429 | View Code Duplication | public function unsubscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
430 | { |
||
431 | if (!$this->isConnected) { |
||
432 | return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.')); |
||
433 | } |
||
434 | |||
435 | return $this->startFlow(new Mqtt\Flow\OutgoingUnsubscribeFlow([$subscription], $this->identifierGenerator)); |
||
436 | } |
||
437 | |||
438 | /** |
||
439 | * {@inheritdoc} |
||
440 | */ |
||
441 | public function publish(Mqtt\Message $message) : Promise\ExtendedPromiseInterface |
||
442 | { |
||
443 | if (!$this->isConnected) { |
||
444 | return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.')); |
||
445 | } |
||
446 | |||
447 | return $this->startFlow(new Mqtt\Flow\OutgoingPublishFlow($message, $this->identifierGenerator)); |
||
448 | } |
||
449 | |||
450 | /** |
||
451 | * {@inheritdoc} |
||
452 | */ |
||
453 | public function publishPeriodically( |
||
454 | int $interval, |
||
455 | Mqtt\Message $message, |
||
456 | callable $generator |
||
457 | ) : Promise\ExtendedPromiseInterface { |
||
458 | if (!$this->isConnected) { |
||
459 | return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.')); |
||
460 | } |
||
461 | |||
462 | $deferred = new Promise\Deferred; |
||
463 | |||
464 | $this->timer[] = $this->loop->addPeriodicTimer($interval, function () use ($message, $generator, $deferred) { |
||
465 | $this->publish($message->withPayload($generator($message->getTopic()))) |
||
466 | ->then( |
||
467 | function ($value) use ($deferred) { |
||
468 | $deferred->notify($value); |
||
469 | }, |
||
470 | function (Exception $e) use ($deferred) { |
||
471 | $deferred->reject($e); |
||
472 | } |
||
473 | ); |
||
474 | }); |
||
475 | |||
476 | return $deferred->promise(); |
||
477 | } |
||
478 | |||
479 | /** |
||
480 | * Establishes a network connection to a server |
||
481 | * |
||
482 | * @return Promise\ExtendedPromiseInterface |
||
483 | * |
||
484 | * @throws Exceptions\InvalidStateException |
||
485 | */ |
||
486 | private function establishConnection() : Promise\ExtendedPromiseInterface |
||
487 | { |
||
488 | $deferred = new Promise\Deferred; |
||
489 | |||
490 | $timer = $this->loop->addTimer($this->timeout, function () use ($deferred) { |
||
491 | $exception = new Exceptions\RuntimeException(sprintf('Connection timed out after %d seconds.', $this->timeout)); |
||
492 | |||
493 | $deferred->reject($exception); |
||
494 | }); |
||
495 | |||
496 | $this->connector->connect($this->configuration->getUri()) |
||
497 | ->always(function () use ($timer) { |
||
498 | $this->loop->cancelTimer($timer); |
||
499 | }) |
||
500 | ->then(function (Stream\DuplexStreamInterface $stream) use ($deferred) { |
||
501 | $stream->on('data', function ($data) { |
||
502 | $this->handleReceive($data); |
||
503 | }); |
||
504 | |||
505 | $stream->on('close', function () { |
||
506 | $this->handleClose(); |
||
507 | }); |
||
508 | |||
509 | $stream->on('error', function (Exception $ex) { |
||
510 | $this->handleError($ex); |
||
511 | }); |
||
512 | |||
513 | $deferred->resolve($stream); |
||
514 | }) |
||
515 | ->otherwise(function (Exception $ex) use ($deferred) { |
||
516 | $deferred->reject($ex); |
||
517 | }); |
||
518 | |||
519 | return $deferred->promise(); |
||
520 | } |
||
521 | |||
522 | /** |
||
523 | * Registers a new client with the broker |
||
524 | * |
||
525 | * @param Mqtt\Connection $connection |
||
526 | * |
||
527 | * @return Promise\ExtendedPromiseInterface |
||
528 | */ |
||
529 | private function registerClient(Mqtt\Connection $connection) : Promise\ExtendedPromiseInterface |
||
530 | { |
||
531 | $deferred = new Promise\Deferred; |
||
532 | |||
533 | $responseTimer = $this->loop->addTimer($this->timeout, function () use ($deferred) { |
||
534 | $exception = new Exceptions\RuntimeException(sprintf('No response after %d seconds.', $this->timeout)); |
||
535 | |||
536 | $deferred->reject($exception); |
||
537 | }); |
||
538 | |||
539 | $this->startFlow(new Mqtt\Flow\OutgoingConnectFlow($connection, $this->identifierGenerator), TRUE) |
||
540 | ->always(function () use ($responseTimer) { |
||
541 | $this->loop->cancelTimer($responseTimer); |
||
542 | |||
543 | })->then(function (Mqtt\Connection $connection) use ($deferred) { |
||
544 | $this->timer[] = $this->loop->addPeriodicTimer(floor($connection->getKeepAlive() * 0.75), function () { |
||
545 | $this->startFlow(new Mqtt\Flow\OutgoingPingFlow); |
||
546 | }); |
||
547 | |||
548 | $deferred->resolve($connection); |
||
549 | |||
550 | })->otherwise(function (Exception $ex) use ($deferred) { |
||
551 | $deferred->reject($ex); |
||
552 | }); |
||
553 | |||
554 | return $deferred->promise(); |
||
555 | } |
||
556 | |||
557 | /** |
||
558 | * Handles incoming data |
||
559 | * |
||
560 | * @param string $data |
||
561 | * |
||
562 | * @return void |
||
563 | */ |
||
564 | private function handleReceive(string $data) : void |
||
565 | { |
||
566 | if (!$this->isConnected && !$this->isConnecting) { |
||
567 | return; |
||
568 | } |
||
569 | |||
570 | $flowCount = count($this->receivingFlows); |
||
571 | |||
572 | $packets = $this->parser->push($data); |
||
573 | |||
574 | foreach ($packets as $packet) { |
||
575 | $this->handlePacket($packet); |
||
576 | } |
||
577 | |||
578 | if ($flowCount > count($this->receivingFlows)) { |
||
579 | $this->receivingFlows = array_values($this->receivingFlows); |
||
580 | } |
||
581 | } |
||
582 | |||
583 | /** |
||
584 | * Handles an incoming packet |
||
585 | * |
||
586 | * @param Mqtt\Packet $packet |
||
587 | * |
||
588 | * @return void |
||
589 | */ |
||
590 | private function handlePacket(Mqtt\Packet $packet) : void |
||
591 | { |
||
592 | switch ($packet->getPacketType()) { |
||
593 | case Mqtt\Packet::TYPE_PUBLISH: |
||
594 | /* @var Mqtt\Packet\PublishRequestPacket $packet */ |
||
595 | $message = new Mqtt\DefaultMessage( |
||
596 | $packet->getTopic(), |
||
597 | $packet->getPayload(), |
||
598 | $packet->getQosLevel(), |
||
599 | $packet->isRetained(), |
||
600 | $packet->isDuplicate() |
||
601 | ); |
||
602 | |||
603 | $this->startFlow(new Mqtt\Flow\IncomingPublishFlow($message, $packet->getIdentifier())); |
||
604 | break; |
||
605 | |||
606 | case Mqtt\Packet::TYPE_CONNACK: |
||
607 | case Mqtt\Packet::TYPE_PINGRESP: |
||
608 | case Mqtt\Packet::TYPE_SUBACK: |
||
609 | case Mqtt\Packet::TYPE_UNSUBACK: |
||
610 | case Mqtt\Packet::TYPE_PUBREL: |
||
611 | case Mqtt\Packet::TYPE_PUBACK: |
||
612 | case Mqtt\Packet::TYPE_PUBREC: |
||
613 | case Mqtt\Packet::TYPE_PUBCOMP: |
||
614 | $flowFound = FALSE; |
||
615 | |||
616 | foreach ($this->receivingFlows as $index => $flow) { |
||
617 | if ($flow->accept($packet)) { |
||
618 | $flowFound = TRUE; |
||
619 | |||
620 | unset($this->receivingFlows[$index]); |
||
621 | $this->continueFlow($flow, $packet); |
||
622 | |||
623 | break; |
||
624 | } |
||
625 | } |
||
626 | |||
627 | if (!$flowFound) { |
||
628 | $ex = new Exceptions\LogicException(sprintf('Received unexpected packet of type %d.', $packet->getPacketType())); |
||
629 | |||
630 | $this->onWarning($ex, $this); |
||
631 | } |
||
632 | break; |
||
633 | |||
634 | default: |
||
635 | $ex = new Exceptions\LogicException(sprintf('Cannot handle packet of type %d.', $packet->getPacketType())); |
||
636 | |||
637 | $this->onWarning($ex, $this); |
||
638 | } |
||
639 | } |
||
640 | |||
641 | /** |
||
642 | * Handles outgoing packets |
||
643 | * |
||
644 | * @return void |
||
645 | */ |
||
646 | private function handleSend() : void |
||
647 | { |
||
648 | $flow = NULL; |
||
649 | |||
650 | if ($this->writtenFlow !== NULL) { |
||
651 | $flow = $this->writtenFlow; |
||
652 | $this->writtenFlow = NULL; |
||
653 | } |
||
654 | |||
655 | if (count($this->sendingFlows) > 0) { |
||
656 | $this->writtenFlow = array_shift($this->sendingFlows); |
||
657 | $this->stream->write($this->writtenFlow->getPacket()); |
||
658 | } |
||
659 | |||
660 | if ($flow !== NULL) { |
||
661 | if ($flow->isFinished()) { |
||
662 | $this->loop->futureTick(function () use ($flow) { |
||
663 | $this->finishFlow($flow); |
||
664 | }); |
||
665 | |||
666 | } else { |
||
667 | $this->receivingFlows[] = $flow; |
||
668 | } |
||
669 | } |
||
670 | } |
||
671 | |||
672 | /** |
||
673 | * Handles closing of the stream |
||
674 | * |
||
675 | * @return void |
||
676 | */ |
||
677 | private function handleClose() : void |
||
678 | { |
||
679 | foreach ($this->timer as $timer) { |
||
680 | $this->loop->cancelTimer($timer); |
||
681 | } |
||
682 | |||
683 | $connection = $this->connection; |
||
684 | |||
685 | $this->isConnecting = FALSE; |
||
686 | $this->isDisconnecting = FALSE; |
||
687 | $this->isConnected = FALSE; |
||
688 | $this->connection = NULL; |
||
689 | $this->stream = NULL; |
||
690 | |||
691 | if ($connection !== NULL) { |
||
692 | $this->onClose($connection, $this); |
||
693 | } |
||
694 | } |
||
695 | |||
696 | /** |
||
697 | * Handles errors of the stream |
||
698 | * |
||
699 | * @param Exception $ex |
||
700 | * |
||
701 | * @return void |
||
702 | */ |
||
703 | private function handleError(Exception $ex) : void |
||
704 | { |
||
705 | $this->onError($ex, $this); |
||
706 | } |
||
707 | |||
708 | /** |
||
709 | * Starts the given flow |
||
710 | * |
||
711 | * @param Mqtt\Flow $flow |
||
712 | * @param bool $isSilent |
||
713 | * |
||
714 | * @return Promise\ExtendedPromiseInterface |
||
715 | */ |
||
716 | private function startFlow(Mqtt\Flow $flow, bool $isSilent = FALSE) : Promise\ExtendedPromiseInterface |
||
717 | { |
||
718 | try { |
||
719 | $packet = $flow->start(); |
||
720 | |||
721 | } catch (Exception $ex) { |
||
722 | $this->onError($ex, $this); |
||
723 | |||
724 | return new Promise\RejectedPromise($ex); |
||
725 | } |
||
726 | |||
727 | $deferred = new Promise\Deferred; |
||
728 | $internalFlow = new Flow\Envelope($flow, $deferred, $packet, $isSilent); |
||
729 | |||
730 | View Code Duplication | if ($packet !== NULL) { |
|
0 ignored issues
–
show
This code seems to be duplicated across your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
731 | if ($this->writtenFlow !== NULL) { |
||
732 | $this->sendingFlows[] = $internalFlow; |
||
733 | |||
734 | } else { |
||
735 | $this->stream->write($packet); |
||
736 | $this->writtenFlow = $internalFlow; |
||
737 | |||
738 | $this->handleSend(); |
||
739 | } |
||
740 | |||
741 | } else { |
||
742 | $this->loop->futureTick(function () use ($internalFlow) { |
||
743 | $this->finishFlow($internalFlow); |
||
744 | }); |
||
745 | } |
||
746 | |||
747 | return $deferred->promise(); |
||
748 | } |
||
749 | |||
750 | /** |
||
751 | * Continues the given flow |
||
752 | * |
||
753 | * @param Flow\Envelope $flow |
||
754 | * @param Mqtt\Packet $packet |
||
755 | * |
||
756 | * @return void |
||
757 | */ |
||
758 | private function continueFlow(Flow\Envelope $flow, Mqtt\Packet $packet) : void |
||
759 | { |
||
760 | try { |
||
761 | $response = $flow->next($packet); |
||
762 | |||
763 | } catch (Exception $ex) { |
||
764 | $this->onError($ex, $this); |
||
765 | |||
766 | return; |
||
767 | } |
||
768 | |||
769 | View Code Duplication | if ($response !== NULL) { |
|
0 ignored issues
–
show
This code seems to be duplicated across your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
770 | if ($this->writtenFlow !== NULL) { |
||
771 | $this->sendingFlows[] = $flow; |
||
772 | |||
773 | } else { |
||
774 | $this->stream->write($response); |
||
775 | $this->writtenFlow = $flow; |
||
776 | |||
777 | $this->handleSend(); |
||
778 | } |
||
779 | |||
780 | } elseif ($flow->isFinished()) { |
||
781 | $this->loop->futureTick(function () use ($flow) { |
||
782 | $this->finishFlow($flow); |
||
783 | }); |
||
784 | } |
||
785 | } |
||
786 | |||
787 | /** |
||
788 | * Finishes the given flow |
||
789 | * |
||
790 | * @param Flow\Envelope $flow |
||
791 | * |
||
792 | * @return void |
||
793 | */ |
||
794 | private function finishFlow(Flow\Envelope $flow) : void |
||
795 | { |
||
796 | if ($flow->isSuccess()) { |
||
797 | if (!$flow->isSilent()) { |
||
798 | switch ($flow->getCode()) { |
||
799 | case 'ping': |
||
800 | $this->onPing($this); |
||
801 | break; |
||
802 | |||
803 | case 'pong': |
||
804 | $this->onPong($this); |
||
805 | break; |
||
806 | |||
807 | case 'connect': |
||
808 | $this->onConnect($flow->getResult(), $this); |
||
809 | break; |
||
810 | |||
811 | case 'disconnect': |
||
812 | $this->onDisconnect($flow->getResult(), $this); |
||
813 | break; |
||
814 | |||
815 | case 'publish': |
||
816 | $this->onPublish($flow->getResult(), $this); |
||
817 | break; |
||
818 | |||
819 | case 'subscribe': |
||
820 | $this->onSubscribe($flow->getResult(), $this); |
||
821 | break; |
||
822 | |||
823 | case 'unsubscribe': |
||
824 | $this->onUnsubscribe($flow->getResult(), $this); |
||
825 | break; |
||
826 | |||
827 | case 'message': |
||
828 | /** @var Mqtt\Message $message */ |
||
829 | $message = $flow->getResult(); |
||
830 | |||
831 | if ($this->lastMessage === NULL) { |
||
832 | $this->lastMessage = $flow->getResult(); |
||
833 | |||
834 | } elseif ( |
||
835 | $this->lastMessage->getTopic() === $message->getTopic() |
||
836 | && $this->lastMessage->getPayload() === $message->getPayload() |
||
837 | ) { |
||
838 | break; |
||
839 | } |
||
840 | |||
841 | $this->lastMessage = $message; |
||
842 | |||
843 | $this->onMessage($message, $this); |
||
844 | break; |
||
845 | } |
||
846 | } |
||
847 | |||
848 | $flow->getDeferred()->resolve($flow->getResult()); |
||
849 | |||
850 | } else { |
||
851 | $ex = new Exceptions\RuntimeException($flow->getErrorMessage()); |
||
852 | $this->onWarning($ex, $this); |
||
853 | |||
854 | $flow->getDeferred()->reject($ex); |
||
855 | } |
||
856 | } |
||
857 | |||
858 | /** |
||
859 | * @return void |
||
860 | */ |
||
861 | private function createConnector() : void |
||
862 | { |
||
863 | $this->connector = new Socket\Connector($this->loop); |
||
864 | |||
865 | if ($this->configuration->isSSLEnabled()) { |
||
866 | $this->connector = new Socket\TcpConnector($this->loop); |
||
867 | $this->connector = new Socket\SecureConnector($this->connector, $this->loop, $this->configuration->getSSLConfiguration()); |
||
868 | } |
||
869 | } |
||
870 | } |
||
871 |
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.