Passed
Push — master ( 5a21ba...beb1d4 )
by Adam
02:10
created

Client::publish()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 1
crap 6
1
<?php
2
/**
3
 * Client.php
4
 *
5
 * @copyright      More in license.md
6
 * @license        http://www.ipublikuj.eu
7
 * @author         Adam Kadlec http://www.ipublikuj.eu
8
 * @package        iPublikuj:MQTTClient!
9
 * @subpackage     Client
10
 * @since          1.0.0
11
 *
12
 * @date           12.03.17
13
 */
14
15
declare(strict_types = 1);
16
17
namespace IPub\MQTTClient\Client;
18
19
use Closure;
20
use Exception;
21
22
use Nette;
23
24
use React\EventLoop;
25
use React\Dns;
26
use React\Promise;
27
use React\Socket;
28
use React\Stream;
29
30
use BinSoul\Net\Mqtt;
31
32
use IPub\MQTTClient\Configuration;
33
use IPub\MQTTClient\Exceptions;
34
use IPub\MQTTClient\Flow;
35
36
/**
37
 * Connection client
38
 *
39
 * @package        iPublikuj:MQTTClient!
40
 * @subpackage     Client
41
 *
42
 * @author         Adam Kadlec <[email protected]>
43
 *
44
 * @method onStart()
45
 * @method onOpen(Mqtt\Connection $connection, IClient $client)
46
 * @method onConnect(Mqtt\Connection $connection, IClient $client)
47
 * @method onDisconnect(Mqtt\Connection $connection, IClient $client)
48
 * @method onClose(Mqtt\Connection $connection, IClient $client)
49
 * @method onPing(IClient $client)
50
 * @method onPong(IClient $client)
51
 * @method onPublish(Mqtt\Message $message, IClient $client)
52
 * @method onSubscribe(Mqtt\Subscription $subscription, IClient $client)
53
 * @method onUnsubscribe(Mqtt\Subscription $subscription, IClient $client)
54
 * @method onMessage(Mqtt\Message $message, IClient $client)
55
 * @method onWarning(Exception $ex, IClient $client)
56
 * @method onError(Exception $ex, IClient $client)
57
 */
58 1
final class Client implements IClient
59
{
60
	/**
61
	 * Implement nette smart magic
62
	 */
63 1
	use Nette\SmartObject;
64
65
	/**
66
	 * @var Closure
67
	 */
68
	public $onStart = [];
69
70
	/**
71
	 * @var Closure
72
	 */
73
	public $onOpen = [];
74
75
	/**
76
	 * @var Closure
77
	 */
78
	public $onConnect = [];
79
80
	/**
81
	 * @var Closure
82
	 */
83
	public $onDisconnect = [];
84
85
	/**
86
	 * @var Closure
87
	 */
88
	public $onClose = [];
89
90
	/**
91
	 * @var Closure
92
	 */
93
	public $onPing = [];
94
95
	/**
96
	 * @var Closure
97
	 */
98
	public $onPong = [];
99
100
	/**
101
	 * @var Closure
102
	 */
103
	public $onPublish = [];
104
105
	/**
106
	 * @var Closure
107
	 */
108
	public $onSubscribe = [];
109
110
	/**
111
	 * @var Closure
112
	 */
113
	public $onUnsubscribe = [];
114
115
	/**
116
	 * @var Closure
117
	 */
118
	public $onMessage = [];
119
120
	/**
121
	 * @var Closure
122
	 */
123
	public $onWarning = [];
124
125
	/**
126
	 * @var Closure
127
	 */
128
	public $onError = [];
129
130
	/**
131
	 * @var EventLoop\LoopInterface
132
	 */
133
	private $loop;
134
135
	/**
136
	 * @var Configuration\Broker
137
	 */
138
	private $configuration;
139
140
	/**
141
	 * @var Socket\ConnectorInterface
142
	 */
143
	private $connector;
144
145
	/**
146
	 * @var Socket\ConnectionInterface|NULL
147
	 */
148
	private $stream = NULL;
149
150
	/**
151
	 * @var Mqtt\Connection
152
	 */
153
	private $connection;
154
155
	/**
156
	 * @var Mqtt\StreamParser
157
	 */
158
	private $parser;
159
160
	/**
161
	 * @var Mqtt\IdentifierGenerator
162
	 */
163
	private $identifierGenerator;
164
165
	/**
166
	 * @var bool
167
	 */
168
	private $isConnected = FALSE;
169
170
	/**
171
	 * @var bool
172
	 */
173
	private $isConnecting = FALSE;
174
175
	/**
176
	 * @var bool
177
	 */
178
	private $isDisconnecting = FALSE;
179
180
	/**
181
	 * @var string
182
	 */
183
	private $host;
184
185
	/**
186
	 * @var int
187
	 */
188
	private $port;
189
190
	/**
191
	 * @var int
192
	 */
193
	private $timeout = 5;
194
195
	/**
196
	 * @var Flow\Envelope[]
197
	 */
198
	private $receivingFlows = [];
199
200
	/**
201
	 * @var Flow\Envelope[]
202
	 */
203
	private $sendingFlows = [];
204
205
	/**
206
	 * @var Flow\Envelope
207
	 */
208
	private $writtenFlow;
209
210
	/**
211
	 * @var EventLoop\TimerInterface[]
212
	 */
213
	private $timer = [];
214
215
	/**
216
	 * @var Mqtt\Message|NULL
217
	 */
218
	private $lastMessage = NULL;
219
220
	/**
221
	 * @param EventLoop\LoopInterface $eventLoop
222
	 * @param Configuration\Broker $configuration
223
	 * @param Mqtt\IdentifierGenerator|NULL $identifierGenerator
224
	 * @param Mqtt\StreamParser|NULL $parser
225
	 */
226
	public function __construct(
227
		EventLoop\LoopInterface $eventLoop,
228
		Configuration\Broker $configuration,
229
		Mqtt\IdentifierGenerator $identifierGenerator = NULL,
230
		Mqtt\StreamParser $parser = NULL
231
	) {
232 1
		$this->loop = $eventLoop;
233 1
		$this->configuration = $configuration;
234
235 1
		$this->createConnector();
236
237 1
		$this->parser = $parser;
238
239 1
		if ($this->parser === NULL) {
240 1
			$this->parser = new Mqtt\StreamParser;
241
		}
242
243 1
		$this->parser->onError(function (Exception $ex) {
244
			$this->onError($ex, $this);
245 1
		});
246
247 1
		$this->identifierGenerator = $identifierGenerator;
248
249 1
		if ($this->identifierGenerator === NULL) {
250 1
			$this->identifierGenerator = new Mqtt\DefaultIdentifierGenerator;
0 ignored issues
show
Documentation Bug introduced by
It seems like new \BinSoul\Net\Mqtt\DefaultIdentifierGenerator() of type object<BinSoul\Net\Mqtt\...ultIdentifierGenerator> is incompatible with the declared type object<BinSoul\Net\Mqtt\IdentifierGenerator> of property $identifierGenerator.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
251
		}
252 1
	}
253
254
	/**
255
	 * {@inheritdoc}
256
	 */
257
	public function setLoop(EventLoop\LoopInterface $loop) : void
258
	{
259
		if (!$this->isConnected && !$this->isConnecting) {
260
			$this->loop = $loop;
261
262
			$this->createConnector();
263
264
		} else {
265
			throw new Exceptions\LogicException('Connection is already established. React event loop could not be changed.');
266
		}
267
	}
268
269
	/**
270
	 * {@inheritdoc}
271
	 */
272
	public function getLoop() : EventLoop\LoopInterface
273
	{
274
		return $this->loop;
275
	}
276
277
	/**
278
	 * {@inheritdoc}
279
	 *
280
	 * @throws Exceptions\InvalidStateException
281
	 */
282
	public function setConfiguration(Configuration\Broker $configuration) : void
283
	{
284
		if ($this->isConnected() || $this->isConnecting) {
285
			throw new Exceptions\InvalidStateException('Client is connecting or connected to the broker, therefore configuration could not be changed.');
286
		}
287
288
		$this->configuration = $configuration;
289
	}
290
291
	/**
292
	 * {@inheritdoc}
293
	 *
294
	 * @throws Exceptions\InvalidStateException
295
	 */
296
	public function getUri() : string
297
	{
298
		return $this->configuration->getUri();
299
	}
300
301
	/**
302
	 * {@inheritdoc}
303
	 */
304
	public function getPort() : int
305
	{
306
		return $this->configuration->getPort();
307
	}
308
309
	/**
310
	 * {@inheritdoc}
311
	 */
312
	public function isConnected() : bool
313
	{
314
		return $this->isConnected;
315
	}
316
317
	/**
318
	 * {@inheritdoc}
319
	 *
320
	 * @throws Exceptions\InvalidStateException
321
	 */
322
	public function connect() : Promise\ExtendedPromiseInterface
323
	{
324
		if ($this->isConnected || $this->isConnecting) {
325
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is already connected.'));
326
		}
327
328
		$this->onStart();
329
330
		$this->isConnecting = TRUE;
331
		$this->isConnected = FALSE;
332
333
		$connection = $this->configuration->getConnection();
334
335
		if ($connection->getClientID() === '') {
336
			$connection->setClientID($this->identifierGenerator->generateClientID());
337
		}
338
339
		$deferred = new Promise\Deferred;
340
341
		$this->establishConnection()
342
			->then(function (Socket\ConnectionInterface $stream) use ($connection, $deferred) {
343
				$this->stream = $stream;
344
345
				$this->onOpen($connection, $this);
346
347
				$this->registerClient($connection)
348
					->then(function (Mqtt\Connection $connection) use ($deferred) {
349
						$this->isConnecting = FALSE;
350
						$this->isConnected = TRUE;
351
352
						$this->connection = $connection;
353
354
						$this->onConnect($connection, $this);
355
356
						$deferred->resolve($this->connection);
357
					})
358
					->otherwise(function (Exception $ex) use ($stream, $deferred, $connection) {
359
						$this->isConnecting = FALSE;
360
361
						$this->onError($ex, $this);
362
363
						$deferred->reject($ex);
364
365
						if ($this->stream !== NULL) {
366
							$this->stream->close();
367
						}
368
369
						$this->onClose($connection, $this);
370
					});
371
			})
372
			->otherwise(function (Exception $ex) use ($deferred) {
373
				$this->isConnecting = FALSE;
374
375
				$this->onError($ex, $this);
376
377
				$deferred->reject($ex);
378
			});
379
380
		return $deferred->promise();
381
	}
382
383
	/**
384
	 * {@inheritdoc}
385
	 */
386
	public function disconnect() : Promise\ExtendedPromiseInterface
387
	{
388
		if (!$this->isConnected || $this->isDisconnecting) {
389
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
390
		}
391
392
		$this->isDisconnecting = TRUE;
393
394
		$deferred = new Promise\Deferred;
395
396
		$this->startFlow(new Mqtt\Flow\OutgoingDisconnectFlow($this->connection), TRUE)
397
			->then(function (Mqtt\Connection $connection) use ($deferred) {
398
				$this->isDisconnecting = FALSE;
399
				$this->isConnected = FALSE;
400
401
				$this->onDisconnect($connection, $this);
402
403
				$deferred->resolve($connection);
404
405
				if ($this->stream !== NULL) {
406
					$this->stream->close();
407
				}
408
			})
409
			->otherwise(function () use ($deferred) {
410
				$this->isDisconnecting = FALSE;
411
				$deferred->reject($this->connection);
412
			});
413
414
		return $deferred->promise();
415
	}
416
417
	/**
418
	 * {@inheritdoc}
419
	 */
420 View Code Duplication
	public function subscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
421
	{
422
		if (!$this->isConnected) {
423
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
424
		}
425
426
		return $this->startFlow(new Mqtt\Flow\OutgoingSubscribeFlow([$subscription], $this->identifierGenerator));
427
	}
428
429
	/**
430
	 * {@inheritdoc}
431
	 */
432 View Code Duplication
	public function unsubscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
433
	{
434
		if (!$this->isConnected) {
435
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
436
		}
437
438
		return $this->startFlow(new Mqtt\Flow\OutgoingUnsubscribeFlow([$subscription], $this->identifierGenerator));
439
	}
440
441
	/**
442
	 * {@inheritdoc}
443
	 */
444
	public function publish(Mqtt\Message $message) : Promise\ExtendedPromiseInterface
445
	{
446
		if (!$this->isConnected) {
447
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
448
		}
449
450
		return $this->startFlow(new Mqtt\Flow\OutgoingPublishFlow($message, $this->identifierGenerator));
451
	}
452
453
	/**
454
	 * {@inheritdoc}
455
	 */
456
	public function publishPeriodically(
457
		int $interval,
458
		Mqtt\Message $message,
459
		callable $generator
460
	) : Promise\ExtendedPromiseInterface {
461
		if (!$this->isConnected) {
462
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
463
		}
464
465
		$deferred = new Promise\Deferred;
466
467
		$this->timer[] = $this->loop->addPeriodicTimer($interval, function () use ($message, $generator, $deferred) {
468
			$this->publish($message->withPayload($generator($message->getTopic())))
469
				->then(
470
					function ($value) use ($deferred) {
471
						$deferred->notify($value);
472
					},
473
					function (Exception $e) use ($deferred) {
474
						$deferred->reject($e);
475
					}
476
				);
477
		});
478
479
		return $deferred->promise();
480
	}
481
482
	/**
483
	 * Establishes a network connection to a server
484
	 *
485
	 * @return Promise\ExtendedPromiseInterface
486
	 *
487
	 * @throws Exceptions\InvalidStateException
488
	 */
489
	private function establishConnection() : Promise\ExtendedPromiseInterface
490
	{
491
		$deferred = new Promise\Deferred;
492
493
		$timer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
494
			$exception = new Exceptions\RuntimeException(sprintf('Connection timed out after %d seconds.', $this->timeout));
495
496
			$deferred->reject($exception);
497
		});
498
499
		$this->connector->connect($this->configuration->getUri())
500
			->always(function () use ($timer) {
501
				$this->loop->cancelTimer($timer);
502
			})
503
			->then(function (Stream\DuplexStreamInterface $stream) use ($deferred) {
504
				$stream->on('data', function ($data) {
505
					$this->handleReceive($data);
506
				});
507
508
				$stream->on('close', function () {
509
					$this->handleClose();
510
				});
511
512
				$stream->on('error', function (Exception $ex) {
513
					$this->handleError($ex);
514
				});
515
516
				$deferred->resolve($stream);
517
			})
518
			->otherwise(function (Exception $ex) use ($deferred) {
519
				$deferred->reject($ex);
520
			});
521
522
		return $deferred->promise();
523
	}
524
525
	/**
526
	 * Registers a new client with the broker
527
	 *
528
	 * @param Mqtt\Connection $connection
529
	 *
530
	 * @return Promise\ExtendedPromiseInterface
531
	 */
532
	private function registerClient(Mqtt\Connection $connection) : Promise\ExtendedPromiseInterface
533
	{
534
		$deferred = new Promise\Deferred;
535
536
		$responseTimer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
537
			$exception = new Exceptions\RuntimeException(sprintf('No response after %d seconds.', $this->timeout));
538
539
			$deferred->reject($exception);
540
		});
541
542
		$this->startFlow(new Mqtt\Flow\OutgoingConnectFlow($connection, $this->identifierGenerator), TRUE)
543
			->always(function () use ($responseTimer) {
544
				$this->loop->cancelTimer($responseTimer);
545
546
			})->then(function (Mqtt\Connection $connection) use ($deferred) {
547
				$this->timer[] = $this->loop->addPeriodicTimer(floor($connection->getKeepAlive() * 0.75), function () {
548
					$this->startFlow(new Mqtt\Flow\OutgoingPingFlow);
549
				});
550
551
				$deferred->resolve($connection);
552
553
			})->otherwise(function (Exception $ex) use ($deferred) {
554
				$deferred->reject($ex);
555
			});
556
557
		return $deferred->promise();
558
	}
559
560
	/**
561
	 * Handles incoming data
562
	 *
563
	 * @param string $data
564
	 *
565
	 * @return void
566
	 */
567
	private function handleReceive(string $data) : void
568
	{
569
		if (!$this->isConnected && !$this->isConnecting) {
570
			return;
571
		}
572
573
		$flowCount = count($this->receivingFlows);
574
575
		$packets = $this->parser->push($data);
576
577
		foreach ($packets as $packet) {
578
			$this->handlePacket($packet);
579
		}
580
581
		if ($flowCount > count($this->receivingFlows)) {
582
			$this->receivingFlows = array_values($this->receivingFlows);
583
		}
584
	}
585
586
	/**
587
	 * Handles an incoming packet
588
	 *
589
	 * @param Mqtt\Packet $packet
590
	 *
591
	 * @return void
592
	 */
593
	private function handlePacket(Mqtt\Packet $packet) : void
594
	{
595
		switch ($packet->getPacketType()) {
596
			case Mqtt\Packet::TYPE_PUBLISH:
597
				/* @var Mqtt\Packet\PublishRequestPacket $packet */
598
				$message = new Mqtt\DefaultMessage(
599
					$packet->getTopic(),
600
					$packet->getPayload(),
601
					$packet->getQosLevel(),
602
					$packet->isRetained(),
603
					$packet->isDuplicate()
604
				);
605
606
				$this->startFlow(new Mqtt\Flow\IncomingPublishFlow($message, $packet->getIdentifier()));
607
				break;
608
609
			case Mqtt\Packet::TYPE_CONNACK:
610
			case Mqtt\Packet::TYPE_PINGRESP:
611
			case Mqtt\Packet::TYPE_SUBACK:
612
			case Mqtt\Packet::TYPE_UNSUBACK:
613
			case Mqtt\Packet::TYPE_PUBREL:
614
			case Mqtt\Packet::TYPE_PUBACK:
615
			case Mqtt\Packet::TYPE_PUBREC:
616
			case Mqtt\Packet::TYPE_PUBCOMP:
617
				$flowFound = FALSE;
618
619
				foreach ($this->receivingFlows as $index => $flow) {
620
					if ($flow->accept($packet)) {
621
						$flowFound = TRUE;
622
623
						unset($this->receivingFlows[$index]);
624
						$this->continueFlow($flow, $packet);
625
626
						break;
627
					}
628
				}
629
630
				if (!$flowFound) {
631
					$ex = new Exceptions\LogicException(sprintf('Received unexpected packet of type %d.', $packet->getPacketType()));
632
633
					$this->onWarning($ex, $this);
634
				}
635
				break;
636
637
			default:
638
				$ex = new Exceptions\LogicException(sprintf('Cannot handle packet of type %d.', $packet->getPacketType()));
639
640
				$this->onWarning($ex, $this);
641
		}
642
	}
643
644
	/**
645
	 * Handles outgoing packets
646
	 *
647
	 * @return void
648
	 */
649
	private function handleSend() : void
650
	{
651
		$flow = NULL;
652
653
		if ($this->writtenFlow !== NULL) {
654
			$flow = $this->writtenFlow;
655
			$this->writtenFlow = NULL;
656
		}
657
658
		if (count($this->sendingFlows) > 0) {
659
			$this->writtenFlow = array_shift($this->sendingFlows);
660
			$this->stream->write($this->writtenFlow->getPacket());
661
		}
662
663
		if ($flow !== NULL) {
664
			if ($flow->isFinished()) {
665
				$this->loop->futureTick(function () use ($flow) {
666
					$this->finishFlow($flow);
667
				});
668
669
			} else {
670
				$this->receivingFlows[] = $flow;
671
			}
672
		}
673
	}
674
675
	/**
676
	 * Handles closing of the stream
677
	 *
678
	 * @return void
679
	 */
680
	private function handleClose() : void
681
	{
682
		foreach ($this->timer as $timer) {
683
			$this->loop->cancelTimer($timer);
684
		}
685
686
		$connection = $this->connection;
687
688
		$this->isConnecting = FALSE;
689
		$this->isDisconnecting = FALSE;
690
		$this->isConnected = FALSE;
691
		$this->connection = NULL;
692
		$this->stream = NULL;
693
694
		if ($connection !== NULL) {
695
			$this->onClose($connection, $this);
696
		}
697
	}
698
699
	/**
700
	 * Handles errors of the stream
701
	 *
702
	 * @param Exception $ex
703
	 *
704
	 * @return void
705
	 */
706
	private function handleError(Exception $ex) : void
707
	{
708
		$this->onError($ex, $this);
709
	}
710
711
	/**
712
	 * Starts the given flow
713
	 *
714
	 * @param Mqtt\Flow $flow
715
	 * @param bool $isSilent
716
	 *
717
	 * @return Promise\ExtendedPromiseInterface
718
	 */
719
	private function startFlow(Mqtt\Flow $flow, bool $isSilent = FALSE) : Promise\ExtendedPromiseInterface
720
	{
721
		try {
722
			$packet = $flow->start();
723
724
		} catch (Exception $ex) {
725
			$this->onError($ex, $this);
726
727
			return new Promise\RejectedPromise($ex);
728
		}
729
730
		$deferred = new Promise\Deferred;
731
		$internalFlow = new Flow\Envelope($flow, $deferred, $packet, $isSilent);
732
733 View Code Duplication
		if ($packet !== NULL) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
734
			if ($this->writtenFlow !== NULL) {
735
				$this->sendingFlows[] = $internalFlow;
736
737
			} else {
738
				$this->stream->write($packet);
739
				$this->writtenFlow = $internalFlow;
740
741
				$this->handleSend();
742
			}
743
744
		} else {
745
			$this->loop->futureTick(function () use ($internalFlow) {
746
				$this->finishFlow($internalFlow);
747
			});
748
		}
749
750
		return $deferred->promise();
751
	}
752
753
	/**
754
	 * Continues the given flow
755
	 *
756
	 * @param Flow\Envelope $flow
757
	 * @param Mqtt\Packet $packet
758
	 *
759
	 * @return void
760
	 */
761
	private function continueFlow(Flow\Envelope $flow, Mqtt\Packet $packet) : void
762
	{
763
		try {
764
			$response = $flow->next($packet);
765
766
		} catch (Exception $ex) {
767
			$this->onError($ex, $this);
768
769
			return;
770
		}
771
772 View Code Duplication
		if ($response !== NULL) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
773
			if ($this->writtenFlow !== NULL) {
774
				$this->sendingFlows[] = $flow;
775
776
			} else {
777
				$this->stream->write($response);
778
				$this->writtenFlow = $flow;
779
780
				$this->handleSend();
781
			}
782
783
		} elseif ($flow->isFinished()) {
784
			$this->loop->futureTick(function () use ($flow) {
785
				$this->finishFlow($flow);
786
			});
787
		}
788
	}
789
790
	/**
791
	 * Finishes the given flow
792
	 *
793
	 * @param Flow\Envelope $flow
794
	 *
795
	 * @return void
796
	 */
797
	private function finishFlow(Flow\Envelope $flow) : void
798
	{
799
		if ($flow->isSuccess()) {
800
			if (!$flow->isSilent()) {
801
				switch ($flow->getCode()) {
802
					case 'ping':
803
						$this->onPing($this);
804
						break;
805
806
					case 'pong':
807
						$this->onPong($this);
808
						break;
809
810
					case 'connect':
811
						$this->onConnect($flow->getResult(), $this);
812
						break;
813
814
					case 'disconnect':
815
						$this->onDisconnect($flow->getResult(), $this);
816
						break;
817
818
					case 'publish':
819
						$this->onPublish($flow->getResult(), $this);
820
						break;
821
822
					case 'subscribe':
823
						$this->onSubscribe($flow->getResult(), $this);
824
						break;
825
826
					case 'unsubscribe':
827
						$this->onUnsubscribe($flow->getResult(), $this);
828
						break;
829
830
					case 'message':
831
						/** @var Mqtt\Message $message */
832
						$message = $flow->getResult();
833
834
						if ($this->lastMessage === NULL) {
835
							$this->lastMessage = $flow->getResult();
836
837
						} elseif (
838
							$this->lastMessage->getTopic() === $message->getTopic()
839
							&& $this->lastMessage->getPayload() === $message->getPayload()
840
						) {
841
							break;
842
						}
843
844
						$this->lastMessage = $message;
845
846
						$this->onMessage($message, $this);
847
						break;
848
				}
849
			}
850
851
			$flow->getDeferred()->resolve($flow->getResult());
852
853
		} else {
854
			$ex = new Exceptions\RuntimeException($flow->getErrorMessage());
855
			$this->onWarning($ex, $this);
856
857
			$flow->getDeferred()->reject($ex);
858
		}
859
	}
860
861
	/**
862
	 * @return void
863
	 */
864
	private function createConnector() : void
865
	{
866 1
		$this->connector = new Socket\TcpConnector($this->loop);
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\TcpConnector($this->loop) of type object<React\Socket\TcpConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
867
868 1
		if ($this->configuration->isDNSEnabled()) {
869 1
			$dnsResolverFactory = new Dns\Resolver\Factory;
870
871 1
			$this->connector = new Socket\DnsConnector($this->connector, $dnsResolverFactory->createCached($this->configuration->getDNSAddress(), $this->loop));
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\DnsCon...ddress(), $this->loop)) of type object<React\Socket\DnsConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
872
		}
873
874 1
		if ($this->configuration->isSSLEnabled()) {
875
			$this->connector = new Socket\SecureConnector($this->connector, $this->loop, $this->configuration->getSSLConfiguration());
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\Secure...>getSSLConfiguration()) of type object<React\Socket\SecureConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
876
		}
877 1
	}
878
}
879