Passed
Push — master ( 958a85...6386e9 )
by Adam
50s queued 10s
created

src/IPub/MQTTClient/Client/Client.php (8 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
/**
3
 * Client.php
4
 *
5
 * @copyright      More in license.md
6
 * @license        http://www.ipublikuj.eu
7
 * @author         Adam Kadlec http://www.ipublikuj.eu
8
 * @package        iPublikuj:MQTTClient!
9
 * @subpackage     Client
10
 * @since          1.0.0
11
 *
12
 * @date           12.03.17
13
 */
14
15
declare(strict_types = 1);
16
17
namespace IPub\MQTTClient\Client;
18
19
use Nette;
20
21
use React\EventLoop;
22
use React\Dns;
23
use React\Promise;
24
use React\Socket;
25
use React\Stream;
26
27
use BinSoul\Net\Mqtt;
28
29
use IPub\MQTTClient\Exceptions;
30
use IPub\MQTTClient\Flow;
31
32
/**
33
 * Connection client
34
 *
35
 * @package        iPublikuj:MQTTClient!
36
 * @subpackage     Client
37
 *
38
 * @author         Adam Kadlec <[email protected]>
39
 *
40
 * @method onStart()
41
 * @method onOpen(Mqtt\Connection $connection, IClient $client)
42
 * @method onConnect(Mqtt\Connection $connection, IClient $client)
43
 * @method onDisconnect(Mqtt\Connection $connection, IClient $client)
44
 * @method onClose(Mqtt\Connection $connection, IClient $client)
45
 * @method onPing(IClient $client)
46
 * @method onPong(IClient $client)
47
 * @method onPublish(Mqtt\Message $message, IClient $client)
48
 * @method onSubscribe(Mqtt\Subscription $subscription, IClient $client)
49
 * @method onUnsubscribe(Mqtt\Subscription $subscription, IClient $client)
50
 * @method onMessage(Mqtt\Message $message, IClient $client)
51
 * @method onWarning(\Exception $ex, IClient $client)
52
 * @method onError(\Exception $ex, IClient $client)
53
 */
54 1
final class Client implements IClient
55
{
56
	/**
57
	 * Implement nette smart magic
58
	 */
59 1
	use Nette\SmartObject;
60
61
	/**
62
	 * @var \Closure
63
	 */
64
	public $onStart = [];
65
66
	/**
67
	 * @var \Closure
68
	 */
69
	public $onOpen = [];
70
71
	/**
72
	 * @var \Closure
73
	 */
74
	public $onConnect = [];
75
76
	/**
77
	 * @var \Closure
78
	 */
79
	public $onDisconnect = [];
80
81
	/**
82
	 * @var \Closure
83
	 */
84
	public $onClose = [];
85
86
	/**
87
	 * @var \Closure
88
	 */
89
	public $onPing = [];
90
91
	/**
92
	 * @var \Closure
93
	 */
94
	public $onPong = [];
95
96
	/**
97
	 * @var \Closure
98
	 */
99
	public $onPublish = [];
100
101
	/**
102
	 * @var \Closure
103
	 */
104
	public $onSubscribe = [];
105
106
	/**
107
	 * @var \Closure
108
	 */
109
	public $onUnsubscribe = [];
110
111
	/**
112
	 * @var \Closure
113
	 */
114
	public $onMessage = [];
115
116
	/**
117
	 * @var \Closure
118
	 */
119
	public $onWarning = [];
120
121
	/**
122
	 * @var \Closure
123
	 */
124
	public $onError = [];
125
126
	/**
127
	 * @var EventLoop\LoopInterface
128
	 */
129
	private $loop;
130
131
	/**
132
	 * @var Configuration
133
	 */
134
	private $configuration;
135
136
	/**
137
	 * @var Socket\ConnectorInterface
138
	 */
139
	private $connector;
140
141
	/**
142
	 * @var Socket\ConnectionInterface|NULL
143
	 */
144
	private $stream = NULL;
145
146
	/**
147
	 * @var Mqtt\Connection
148
	 */
149
	private $connection;
150
151
	/**
152
	 * @var Mqtt\StreamParser
153
	 */
154
	private $parser;
155
156
	/**
157
	 * @var Mqtt\IdentifierGenerator
158
	 */
159
	private $identifierGenerator;
160
161
	/**
162
	 * @var bool
163
	 */
164
	private $isConnected = FALSE;
165
166
	/**
167
	 * @var bool
168
	 */
169
	private $isConnecting = FALSE;
170
171
	/**
172
	 * @var bool
173
	 */
174
	private $isDisconnecting = FALSE;
175
176
	/**
177
	 * @var string
178
	 */
179
	private $host;
180
181
	/**
182
	 * @var int
183
	 */
184
	private $port;
185
186
	/**
187
	 * @var int
188
	 */
189
	private $timeout = 5;
190
191
	/**
192
	 * @var Flow\Envelope[]
193
	 */
194
	private $receivingFlows = [];
195
196
	/**
197
	 * @var Flow\Envelope[]
198
	 */
199
	private $sendingFlows = [];
200
201
	/**
202
	 * @var Flow\Envelope
203
	 */
204
	private $writtenFlow;
205
206
	/**
207
	 * @var EventLoop\Timer\TimerInterface[]
208
	 */
209
	private $timer = [];
210
211
	/**
212
	 * @var Mqtt\Message|NULL
213
	 */
214
	private $lastMessage = NULL;
215
216
	/**
217
	 * @param EventLoop\LoopInterface $eventLoop
218
	 * @param Configuration $configuration
219
	 * @param Mqtt\IdentifierGenerator|NULL $identifierGenerator
220
	 * @param Mqtt\StreamParser|NULL $parser
221
	 */
222
	public function __construct(
223
		EventLoop\LoopInterface $eventLoop,
224
		Configuration $configuration,
225
		Mqtt\IdentifierGenerator $identifierGenerator = NULL,
226
		Mqtt\StreamParser $parser = NULL
227
	) {
228 1
		$this->loop = $eventLoop;
229 1
		$this->configuration = $configuration;
230
231 1
		$this->createConnector();
232
233 1
		$this->parser = $parser;
234
235 1
		if ($this->parser === NULL) {
236 1
			$this->parser = new Mqtt\StreamParser;
237
		}
238
239 1
		$this->parser->onError(function (\Exception $ex) {
240
			$this->onError($ex, $this);
241 1
		});
242
243 1
		$this->identifierGenerator = $identifierGenerator;
244
245 1
		if ($this->identifierGenerator === NULL) {
246 1
			$this->identifierGenerator = new Mqtt\DefaultIdentifierGenerator;
0 ignored issues
show
Documentation Bug introduced by
It seems like new \BinSoul\Net\Mqtt\DefaultIdentifierGenerator() of type object<BinSoul\Net\Mqtt\...ultIdentifierGenerator> is incompatible with the declared type object<BinSoul\Net\Mqtt\IdentifierGenerator> of property $identifierGenerator.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
247
		}
248 1
	}
249
250
	/**
251
	 * {@inheritdoc}
252
	 */
253
	public function setLoop(EventLoop\LoopInterface $loop) : void
254
	{
255
		if (!$this->isConnected && !$this->isConnecting) {
256
			$this->loop = $loop;
257
258
			$this->createConnector();
259
260
		} else {
261
			throw new Exceptions\LogicException('Connection is already established. React event loop could not be changed.');
262
		}
263
	}
264
265
	/**
266
	 * {@inheritdoc}
267
	 */
268
	public function getLoop() : EventLoop\LoopInterface
269
	{
270
		return $this->loop;
271
	}
272
273
	/**
274
	 * {@inheritdoc}
275
	 */
276
	public function setConfiguration(Configuration $configuration) : void
277
	{
278
		if ($this->isConnected() || $this->isConnecting) {
279
			throw new Exceptions\InvalidStateException('Client is connecting or connected to the broker, therefore configuration could not be changed.');
280
		}
281
282
		$this->configuration = $configuration;
283
	}
284
285
	/**
286
	 * {@inheritdoc}
287
	 */
288
	public function getUri() : string
289
	{
290
		return $this->configuration->getUri();
291
	}
292
293
	/**
294
	 * {@inheritdoc}
295
	 */
296
	public function getPort() : int
297
	{
298
		return $this->configuration->getPort();
299
	}
300
301
	/**
302
	 * {@inheritdoc}
303
	 */
304
	public function isConnected() : bool
305
	{
306
		return $this->isConnected;
307
	}
308
309
	/**
310
	 * {@inheritdoc}
311
	 */
312
	public function connect() : Promise\ExtendedPromiseInterface
313
	{
314
		if ($this->isConnected || $this->isConnecting) {
315
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is already connected.'));
316
		}
317
318
		$this->onStart();
319
320
		$this->isConnecting = TRUE;
321
		$this->isConnected = FALSE;
322
323
		$connection = $this->configuration->getConnection();
324
325
		if ($connection->getClientID() === '') {
326
			$connection = $connection->withClientID($this->identifierGenerator->generateClientID());
327
		}
328
329
		$deferred = new Promise\Deferred;
330
331
		$this->establishConnection()
332
			->then(function (Socket\ConnectionInterface $stream) use ($connection, $deferred) {
333
				$this->stream = $stream;
334
335
				$this->onOpen($connection, $this);
336
337
				$this->registerClient($connection)
338
					->then(function (Mqtt\Connection $connection) use ($deferred) {
339
						$this->isConnecting = FALSE;
340
						$this->isConnected = TRUE;
341
342
						$this->connection = $connection;
343
344
						$this->onConnect($connection, $this);
345
346
						$deferred->resolve($this->connection);
347
					})
348
					->otherwise(function (\Exception $ex) use ($stream, $deferred, $connection) {
349
						$this->isConnecting = FALSE;
350
351
						$this->onError($ex, $this);
352
353
						$deferred->reject($ex);
354
355
						if ($this->stream !== NULL) {
356
							$this->stream->close();
357
						}
358
359
						$this->onClose($connection, $this);
360
					});
361
			})
362
			->otherwise(function (\Exception $ex) use ($deferred) {
363
				$this->isConnecting = FALSE;
364
365
				$this->onError($ex, $this);
366
367
				$deferred->reject($ex);
368
			});
369
370
		return $deferred->promise();
371
	}
372
373
	/**
374
	 * {@inheritdoc}
375
	 */
376
	public function disconnect() : Promise\ExtendedPromiseInterface
377
	{
378
		if (!$this->isConnected || $this->isDisconnecting) {
379
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
380
		}
381
382
		$this->isDisconnecting = TRUE;
383
384
		$deferred = new Promise\Deferred;
385
386
		$this->startFlow(new Mqtt\Flow\OutgoingDisconnectFlow($this->connection), TRUE)
387
			->then(function (Mqtt\Connection $connection) use ($deferred) {
388
				$this->isDisconnecting = FALSE;
389
				$this->isConnected = FALSE;
390
391
				$this->onDisconnect($connection, $this);
392
393
				$deferred->resolve($connection);
394
395
				if ($this->stream !== NULL) {
396
					$this->stream->close();
397
				}
398
			})
399
			->otherwise(function () use ($deferred) {
400
				$this->isDisconnecting = FALSE;
401
				$deferred->reject($this->connection);
402
			});
403
404
		return $deferred->promise();
405
	}
406
407
	/**
408
	 * {@inheritdoc}
409
	 */
410 View Code Duplication
	public function subscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
411
	{
412
		if (!$this->isConnected) {
413
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
414
		}
415
416
		return $this->startFlow(new Mqtt\Flow\OutgoingSubscribeFlow([$subscription], $this->identifierGenerator));
417
	}
418
419
	/**
420
	 * {@inheritdoc}
421
	 */
422 View Code Duplication
	public function unsubscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
423
	{
424
		if (!$this->isConnected) {
425
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
426
		}
427
428
		return $this->startFlow(new Mqtt\Flow\OutgoingUnsubscribeFlow([$subscription], $this->identifierGenerator));
429
	}
430
431
	/**
432
	 * {@inheritdoc}
433
	 */
434
	public function publish(Mqtt\Message $message) : Promise\ExtendedPromiseInterface
435
	{
436
		if (!$this->isConnected) {
437
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
438
		}
439
440
		return $this->startFlow(new Mqtt\Flow\OutgoingPublishFlow($message, $this->identifierGenerator));
441
	}
442
443
	/**
444
	 * {@inheritdoc}
445
	 */
446
	public function publishPeriodically(
447
		int $interval,
448
		Mqtt\Message $message,
449
		callable $generator
450
	) : Promise\ExtendedPromiseInterface {
451
		if (!$this->isConnected) {
452
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
453
		}
454
455
		$deferred = new Promise\Deferred;
456
457
		$this->timer[] = $this->loop->addPeriodicTimer($interval, function () use ($message, $generator, $deferred) {
458
			$this->publish($message->withPayload($generator($message->getTopic())))
459
				->then(
460
					function ($value) use ($deferred) {
461
						$deferred->notify($value);
462
					},
463
					function (\Exception $e) use ($deferred) {
464
						$deferred->reject($e);
465
					}
466
				);
467
		});
468
469
		return $deferred->promise();
470
	}
471
472
	/**
473
	 * Establishes a network connection to a server
474
	 *
475
	 * @return Promise\ExtendedPromiseInterface
476
	 */
477
	private function establishConnection() : Promise\ExtendedPromiseInterface
478
	{
479
		$deferred = new Promise\Deferred;
480
481
		$timer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
482
			$exception = new Exceptions\RuntimeException(sprintf('Connection timed out after %d seconds.', $this->timeout));
483
484
			$deferred->reject($exception);
485
		});
486
487
		$this->connector->connect($this->configuration->getUri())
488
			->always(function () use ($timer) {
489
				$this->loop->cancelTimer($timer);
490
			})
491
			->then(function (Stream\DuplexStreamInterface $stream) use ($deferred) {
492
				$stream->on('data', function ($data) {
493
					$this->handleReceive($data);
494
				});
495
496
				$stream->on('close', function () {
497
					$this->handleClose();
498
				});
499
500
				$stream->on('error', function (\Exception $ex) {
501
					$this->handleError($ex);
502
				});
503
504
				$deferred->resolve($stream);
505
			})
506
			->otherwise(function (\Exception $ex) use ($deferred) {
507
				$deferred->reject($ex);
508
			});
509
510
		return $deferred->promise();
511
	}
512
513
	/**
514
	 * Registers a new client with the broker
515
	 *
516
	 * @param Mqtt\Connection $connection
517
	 *
518
	 * @return Promise\ExtendedPromiseInterface
519
	 */
520
	private function registerClient(Mqtt\Connection $connection) : Promise\ExtendedPromiseInterface
521
	{
522
		$deferred = new Promise\Deferred;
523
524
		$responseTimer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
525
			$exception = new Exceptions\RuntimeException(sprintf('No response after %d seconds.', $this->timeout));
526
527
			$deferred->reject($exception);
528
		});
529
530
		$this->startFlow(new Mqtt\Flow\OutgoingConnectFlow($connection, $this->identifierGenerator), TRUE)
531
			->always(function () use ($responseTimer) {
532
				$this->loop->cancelTimer($responseTimer);
533
534
			})->then(function (Mqtt\Connection $connection) use ($deferred) {
535
				$this->timer[] = $this->loop->addPeriodicTimer(floor($connection->getKeepAlive() * 0.75), function () {
536
					$this->startFlow(new Mqtt\Flow\OutgoingPingFlow);
537
				});
538
539
				$deferred->resolve($connection);
540
541
			})->otherwise(function (\Exception $ex) use ($deferred) {
542
				$deferred->reject($ex);
543
			});
544
545
		return $deferred->promise();
546
	}
547
548
	/**
549
	 * Handles incoming data
550
	 *
551
	 * @param string $data
552
	 *
553
	 * @return void
554
	 */
555
	private function handleReceive(string $data) : void
556
	{
557
		if (!$this->isConnected && !$this->isConnecting) {
558
			return;
559
		}
560
561
		$flowCount = count($this->receivingFlows);
562
563
		$packets = $this->parser->push($data);
564
565
		foreach ($packets as $packet) {
566
			$this->handlePacket($packet);
567
		}
568
569
		if ($flowCount > count($this->receivingFlows)) {
570
			$this->receivingFlows = array_values($this->receivingFlows);
571
		}
572
	}
573
574
	/**
575
	 * Handles an incoming packet
576
	 *
577
	 * @param Mqtt\Packet $packet
578
	 *
579
	 * @return void
580
	 */
581
	private function handlePacket(Mqtt\Packet $packet) : void
582
	{
583
		switch ($packet->getPacketType()) {
584
			case Mqtt\Packet::TYPE_PUBLISH:
585
				/* @var Mqtt\Packet\PublishRequestPacket $packet */
586
				$message = new Mqtt\DefaultMessage(
587
					$packet->getTopic(),
588
					$packet->getPayload(),
589
					$packet->getQosLevel(),
590
					$packet->isRetained(),
591
					$packet->isDuplicate()
592
				);
593
594
				$this->startFlow(new Mqtt\Flow\IncomingPublishFlow($message, $packet->getIdentifier()));
595
				break;
596
597
			case Mqtt\Packet::TYPE_CONNACK:
598
			case Mqtt\Packet::TYPE_PINGRESP:
599
			case Mqtt\Packet::TYPE_SUBACK:
600
			case Mqtt\Packet::TYPE_UNSUBACK:
601
			case Mqtt\Packet::TYPE_PUBREL:
602
			case Mqtt\Packet::TYPE_PUBACK:
603
			case Mqtt\Packet::TYPE_PUBREC:
604
			case Mqtt\Packet::TYPE_PUBCOMP:
605
				$flowFound = FALSE;
606
607
				foreach ($this->receivingFlows as $index => $flow) {
608
					if ($flow->accept($packet)) {
609
						$flowFound = TRUE;
610
611
						unset($this->receivingFlows[$index]);
612
						$this->continueFlow($flow, $packet);
613
614
						break;
615
					}
616
				}
617
618
				if (!$flowFound) {
619
					$ex = new Exceptions\LogicException(sprintf('Received unexpected packet of type %d.', $packet->getPacketType()));
620
621
					$this->onWarning($ex, $this);
622
				}
623
				break;
624
625
			default:
626
				$ex = new Exceptions\LogicException(sprintf('Cannot handle packet of type %d.', $packet->getPacketType()));
627
628
				$this->onWarning($ex, $this);
629
		}
630
	}
631
632
	/**
633
	 * Handles outgoing packets
634
	 *
635
	 * @return void
636
	 */
637
	private function handleSend() : void
638
	{
639
		$flow = NULL;
640
641
		if ($this->writtenFlow !== NULL) {
642
			$flow = $this->writtenFlow;
643
			$this->writtenFlow = NULL;
644
		}
645
646
		if (count($this->sendingFlows) > 0) {
647
			$this->writtenFlow = array_shift($this->sendingFlows);
648
			$this->stream->write($this->writtenFlow->getPacket());
649
		}
650
651
		if ($flow !== NULL) {
652
			if ($flow->isFinished()) {
653
				$this->loop->futureTick(function () use ($flow) {
654
					$this->finishFlow($flow);
655
				});
656
657
			} else {
658
				$this->receivingFlows[] = $flow;
659
			}
660
		}
661
	}
662
663
	/**
664
	 * Handles closing of the stream
665
	 *
666
	 * @return void
667
	 */
668
	private function handleClose() : void
669
	{
670
		foreach ($this->timer as $timer) {
671
			$this->loop->cancelTimer($timer);
672
		}
673
674
		$connection = $this->connection;
675
676
		$this->isConnecting = FALSE;
677
		$this->isDisconnecting = FALSE;
678
		$this->isConnected = FALSE;
679
		$this->connection = NULL;
680
		$this->stream = NULL;
681
682
		if ($connection !== NULL) {
683
			$this->onClose($connection, $this);
684
		}
685
	}
686
687
	/**
688
	 * Handles errors of the stream
689
	 *
690
	 * @param \Exception $ex
691
	 *
692
	 * @return void
693
	 */
694
	private function handleError(\Exception $ex) : void
695
	{
696
		$this->onError($ex, $this);
697
	}
698
699
	/**
700
	 * Starts the given flow
701
	 *
702
	 * @param Mqtt\Flow $flow
703
	 * @param bool $isSilent
704
	 *
705
	 * @return Promise\ExtendedPromiseInterface
706
	 */
707
	private function startFlow(Mqtt\Flow $flow, bool $isSilent = FALSE) : Promise\ExtendedPromiseInterface
708
	{
709
		try {
710
			$packet = $flow->start();
711
712
		} catch (\Exception $ex) {
713
			$this->onError($ex, $this);
714
715
			return new Promise\RejectedPromise($ex);
716
		}
717
718
		$deferred = new Promise\Deferred;
719
		$internalFlow = new Flow\Envelope($flow, $deferred, $packet, $isSilent);
720
721 View Code Duplication
		if ($packet !== NULL) {
0 ignored issues
show
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
722
			if ($this->writtenFlow !== NULL) {
723
				$this->sendingFlows[] = $internalFlow;
724
725
			} else {
726
				$this->stream->write($packet);
727
				$this->writtenFlow = $internalFlow;
728
729
				$this->handleSend();
730
			}
731
732
		} else {
733
			$this->loop->futureTick(function () use ($internalFlow) {
734
				$this->finishFlow($internalFlow);
735
			});
736
		}
737
738
		return $deferred->promise();
739
	}
740
741
	/**
742
	 * Continues the given flow
743
	 *
744
	 * @param Flow\Envelope $flow
745
	 * @param Mqtt\Packet $packet
746
	 *
747
	 * @return void
748
	 */
749
	private function continueFlow(Flow\Envelope $flow, Mqtt\Packet $packet) : void
750
	{
751
		try {
752
			$response = $flow->next($packet);
753
754
		} catch (\Exception $ex) {
755
			$this->onError($ex, $this);
756
757
			return;
758
		}
759
760 View Code Duplication
		if ($response !== NULL) {
0 ignored issues
show
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
761
			if ($this->writtenFlow !== NULL) {
762
				$this->sendingFlows[] = $flow;
763
764
			} else {
765
				$this->stream->write($response);
766
				$this->writtenFlow = $flow;
767
768
				$this->handleSend();
769
			}
770
771
		} elseif ($flow->isFinished()) {
772
			$this->loop->futureTick(function () use ($flow) {
773
				$this->finishFlow($flow);
774
			});
775
		}
776
	}
777
778
	/**
779
	 * Finishes the given flow
780
	 *
781
	 * @param Flow\Envelope $flow
782
	 *
783
	 * @return void
784
	 */
785
	private function finishFlow(Flow\Envelope $flow) : void
786
	{
787
		if ($flow->isSuccess()) {
788
			if (!$flow->isSilent()) {
789
				switch ($flow->getCode()) {
790
					case 'ping':
791
						$this->onPing($this);
792
						break;
793
794
					case 'pong':
795
						$this->onPong($this);
796
						break;
797
798
					case 'connect':
799
						$this->onConnect($flow->getResult(), $this);
800
						break;
801
802
					case 'disconnect':
803
						$this->onDisconnect($flow->getResult(), $this);
804
						break;
805
806
					case 'publish':
807
						$this->onPublish($flow->getResult(), $this);
808
						break;
809
810
					case 'subscribe':
811
						$this->onSubscribe($flow->getResult(), $this);
812
						break;
813
814
					case 'unsubscribe':
815
						$this->onUnsubscribe($flow->getResult(), $this);
816
						break;
817
818
					case 'message':
819
						/** @var Mqtt\Message $message */
820
						$message = $flow->getResult();
821
822
						if ($this->lastMessage === NULL) {
823
							$this->lastMessage = $flow->getResult();
824
825
						} elseif (
826
							$this->lastMessage->getTopic() === $message->getTopic()
827
							&& $this->lastMessage->getPayload() === $message->getPayload()
828
						) {
829
							break;
830
						}
831
832
						$this->lastMessage = $message;
833
834
						$this->onMessage($message, $this);
835
						break;
836
				}
837
			}
838
839
			$flow->getDeferred()->resolve($flow->getResult());
840
841
		} else {
842
			$ex = new Exceptions\RuntimeException($flow->getErrorMessage());
843
			$this->onWarning($ex, $this);
844
845
			$flow->getDeferred()->reject($ex);
846
		}
847
	}
848
849
	/**
850
	 * @return void
851
	 */
852
	private function createConnector() : void
853
	{
854 1
		$this->connector = new Socket\TcpConnector($this->loop);
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\TcpConnector($this->loop) of type object<React\Socket\TcpConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
855
856 1
		if ($this->configuration->isDNSEnabled()) {
857 1
			$dnsResolverFactory = new Dns\Resolver\Factory;
858
859 1
			$this->connector = new Socket\DnsConnector($this->connector, $dnsResolverFactory->createCached($this->configuration->getDNSAddress(), $this->loop));
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\DnsCon...ddress(), $this->loop)) of type object<React\Socket\DnsConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
860
		}
861
862 1
		if ($this->configuration->isSSLEnabled()) {
863
			$this->connector = new Socket\SecureConnector($this->connector, $this->loop, $this->configuration->getSSLConfiguration());
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\Secure...>getSSLConfiguration()) of type object<React\Socket\SecureConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
864
		}
865 1
	}
866
}
867