Passed
Push — master ( 12828b...dadfd5 )
by Adam
02:24
created

Client::createConnector()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 0
cts 5
cp 0
rs 9.9666
c 0
b 0
f 0
cc 2
nc 2
nop 0
crap 6
1
<?php
2
/**
3
 * Client.php
4
 *
5
 * @copyright      More in license.md
6
 * @license        http://www.ipublikuj.eu
7
 * @author         Adam Kadlec http://www.ipublikuj.eu
8
 * @package        iPublikuj:MQTTClient!
9
 * @subpackage     Client
10
 * @since          1.0.0
11
 *
12
 * @date           12.03.17
13
 */
14
15
declare(strict_types = 1);
16
17
namespace IPub\MQTTClient\Client;
18
19
use Closure;
20
use Exception;
21
22
use Nette;
23
24
use React\EventLoop;
25
use React\Promise;
26
use React\Socket;
27
use React\Stream;
28
29
use BinSoul\Net\Mqtt;
30
31
use IPub\MQTTClient\Configuration;
32
use IPub\MQTTClient\Exceptions;
33
use IPub\MQTTClient\Flow;
34
35
/**
36
 * Connection client
37
 *
38
 * @package        iPublikuj:MQTTClient!
39
 * @subpackage     Client
40
 *
41
 * @author         Adam Kadlec <[email protected]>
42
 *
43
 * @method onStart()
44
 * @method onOpen(Mqtt\Connection $connection, IClient $client)
45
 * @method onConnect(Mqtt\Connection $connection, IClient $client)
46
 * @method onDisconnect(Mqtt\Connection $connection, IClient $client)
47
 * @method onClose(Mqtt\Connection $connection, IClient $client)
48
 * @method onPing(IClient $client)
49
 * @method onPong(IClient $client)
50
 * @method onPublish(Mqtt\Message $message, IClient $client)
51
 * @method onSubscribe(Mqtt\Subscription $subscription, IClient $client)
52
 * @method onUnsubscribe(Mqtt\Subscription $subscription, IClient $client)
53
 * @method onMessage(Mqtt\Message $message, IClient $client)
54
 * @method onWarning(Exception $ex, IClient $client)
55
 * @method onError(Exception $ex, IClient $client)
56
 */
57 1
final class Client implements IClient
58
{
59
	/**
60
	 * Implement nette smart magic
61
	 */
62 1
	use Nette\SmartObject;
63
64
	/**
65
	 * @var Closure[]
66
	 */
67
	public $onStart = [];
68
69
	/**
70
	 * @var Closure[]
71
	 */
72
	public $onOpen = [];
73
74
	/**
75
	 * @var Closure[]
76
	 */
77
	public $onConnect = [];
78
79
	/**
80
	 * @var Closure[]
81
	 */
82
	public $onDisconnect = [];
83
84
	/**
85
	 * @var Closure[]
86
	 */
87
	public $onClose = [];
88
89
	/**
90
	 * @var Closure[]
91
	 */
92
	public $onPing = [];
93
94
	/**
95
	 * @var Closure[]
96
	 */
97
	public $onPong = [];
98
99
	/**
100
	 * @var Closure[]
101
	 */
102
	public $onPublish = [];
103
104
	/**
105
	 * @var Closure[]
106
	 */
107
	public $onSubscribe = [];
108
109
	/**
110
	 * @var Closure[]
111
	 */
112
	public $onUnsubscribe = [];
113
114
	/**
115
	 * @var Closure[]
116
	 */
117
	public $onMessage = [];
118
119
	/**
120
	 * @var Closure[]
121
	 */
122
	public $onWarning = [];
123
124
	/**
125
	 * @var Closure[]
126
	 */
127
	public $onError = [];
128
129
	/**
130
	 * @var EventLoop\LoopInterface
131
	 */
132
	private $loop;
133
134
	/**
135
	 * @var Configuration\Broker
136
	 */
137
	private $configuration;
138
139
	/**
140
	 * @var Socket\ConnectorInterface
141
	 */
142
	private $connector;
143
144
	/**
145
	 * @var Socket\ConnectionInterface|NULL
146
	 */
147
	private $stream = NULL;
148
149
	/**
150
	 * @var Mqtt\Connection
151
	 */
152
	private $connection;
153
154
	/**
155
	 * @var Mqtt\StreamParser
156
	 */
157
	private $parser;
158
159
	/**
160
	 * @var Mqtt\IdentifierGenerator
161
	 */
162
	private $identifierGenerator;
163
164
	/**
165
	 * @var bool
166
	 */
167
	private $isConnected = FALSE;
168
169
	/**
170
	 * @var bool
171
	 */
172
	private $isConnecting = FALSE;
173
174
	/**
175
	 * @var bool
176
	 */
177
	private $isDisconnecting = FALSE;
178
179
	/**
180
	 * @var string
181
	 */
182
	private $host;
183
184
	/**
185
	 * @var int
186
	 */
187
	private $port;
188
189
	/**
190
	 * @var int
191
	 */
192
	private $timeout = 5;
193
194
	/**
195
	 * @var Flow\Envelope[]
196
	 */
197
	private $receivingFlows = [];
198
199
	/**
200
	 * @var Flow\Envelope[]
201
	 */
202
	private $sendingFlows = [];
203
204
	/**
205
	 * @var Flow\Envelope
206
	 */
207
	private $writtenFlow;
208
209
	/**
210
	 * @var EventLoop\TimerInterface[]
211
	 */
212
	private $timer = [];
213
214
	/**
215
	 * @var Mqtt\Message|NULL
216
	 */
217
	private $lastMessage = NULL;
218
219
	/**
220
	 * @param EventLoop\LoopInterface $eventLoop
221
	 * @param Configuration\Broker $configuration
222
	 * @param Mqtt\IdentifierGenerator|NULL $identifierGenerator
223
	 * @param Mqtt\StreamParser|NULL $parser
224
	 */
225
	public function __construct(
226
		EventLoop\LoopInterface $eventLoop,
227
		Configuration\Broker $configuration,
228
		Mqtt\IdentifierGenerator $identifierGenerator = NULL,
229
		Mqtt\StreamParser $parser = NULL
230
	) {
231 1
		$this->loop = $eventLoop;
232 1
		$this->configuration = $configuration;
233
234 1
		$this->parser = $parser;
235
236 1
		if ($this->parser === NULL) {
237 1
			$this->parser = new Mqtt\StreamParser;
238
		}
239
240 1
		$this->parser->onError(function (Exception $ex) {
241
			$this->onError($ex, $this);
242 1
		});
243
244 1
		$this->identifierGenerator = $identifierGenerator;
245
246 1
		if ($this->identifierGenerator === NULL) {
247 1
			$this->identifierGenerator = new Mqtt\DefaultIdentifierGenerator;
0 ignored issues
show
Documentation Bug introduced by
It seems like new \BinSoul\Net\Mqtt\DefaultIdentifierGenerator() of type object<BinSoul\Net\Mqtt\...ultIdentifierGenerator> is incompatible with the declared type object<BinSoul\Net\Mqtt\IdentifierGenerator> of property $identifierGenerator.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
248
		}
249 1
	}
250
251
	/**
252
	 * {@inheritdoc}
253
	 */
254
	public function setLoop(EventLoop\LoopInterface $loop) : void
255
	{
256
		if (!$this->isConnected && !$this->isConnecting) {
257
			$this->loop = $loop;
258
259
		} else {
260
			throw new Exceptions\LogicException('Connection is already established. React event loop could not be changed.');
261
		}
262
	}
263
264
	/**
265
	 * {@inheritdoc}
266
	 */
267
	public function getLoop() : EventLoop\LoopInterface
268
	{
269
		return $this->loop;
270
	}
271
272
	/**
273
	 * {@inheritdoc}
274
	 *
275
	 * @throws Exceptions\InvalidStateException
276
	 */
277
	public function setConfiguration(Configuration\Broker $configuration) : void
278
	{
279
		if ($this->isConnected() || $this->isConnecting) {
280
			throw new Exceptions\InvalidStateException('Client is connecting or connected to the broker, therefore configuration could not be changed.');
281
		}
282
283
		$this->configuration = $configuration;
284
	}
285
286
	/**
287
	 * {@inheritdoc}
288
	 *
289
	 * @throws Exceptions\InvalidStateException
290
	 */
291
	public function getUri() : string
292
	{
293
		return $this->configuration->getUri();
294
	}
295
296
	/**
297
	 * {@inheritdoc}
298
	 */
299
	public function getPort() : int
300
	{
301
		return $this->configuration->getPort();
302
	}
303
304
	/**
305
	 * {@inheritdoc}
306
	 */
307
	public function isConnected() : bool
308
	{
309
		return $this->isConnected;
310
	}
311
312
	/**
313
	 * {@inheritdoc}
314
	 *
315
	 * @throws Exceptions\InvalidStateException
316
	 */
317
	public function connect() : Promise\ExtendedPromiseInterface
318
	{
319
		if ($this->isConnected || $this->isConnecting) {
320
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is already connected.'));
321
		}
322
323
		$this->createConnector();
324
325
		$this->onStart();
326
327
		$this->isConnecting = TRUE;
328
		$this->isConnected = FALSE;
329
330
		$connection = $this->configuration->getConnection();
331
332
		if ($connection->getClientID() === '') {
333
			$connection->setClientID($this->identifierGenerator->generateClientID());
334
		}
335
336
		$deferred = new Promise\Deferred;
337
338
		$this->establishConnection()
339
			->then(function (Socket\ConnectionInterface $stream) use ($connection, $deferred) {
340
				$this->stream = $stream;
341
342
				$this->onOpen($connection, $this);
343
344
				$this->registerClient($connection)
345
					->then(function (Mqtt\Connection $connection) use ($deferred) {
346
						$this->isConnecting = FALSE;
347
						$this->isConnected = TRUE;
348
349
						$this->connection = $connection;
350
351
						$this->onConnect($connection, $this);
352
353
						$deferred->resolve($this->connection);
354
					})
355
					->otherwise(function (Exception $ex) use ($stream, $deferred, $connection) {
356
						$this->isConnecting = FALSE;
357
358
						$this->onError($ex, $this);
359
360
						$deferred->reject($ex);
361
362
						if ($this->stream !== NULL) {
363
							$this->stream->close();
364
						}
365
366
						$this->onClose($connection, $this);
367
					});
368
			})
369
			->otherwise(function (Exception $ex) use ($deferred) {
370
				$this->isConnecting = FALSE;
371
372
				$this->onError($ex, $this);
373
374
				$deferred->reject($ex);
375
			});
376
377
		return $deferred->promise();
378
	}
379
380
	/**
381
	 * {@inheritdoc}
382
	 */
383
	public function disconnect() : Promise\ExtendedPromiseInterface
384
	{
385
		if (!$this->isConnected || $this->isDisconnecting) {
386
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
387
		}
388
389
		$this->isDisconnecting = TRUE;
390
391
		$deferred = new Promise\Deferred;
392
393
		$this->startFlow(new Mqtt\Flow\OutgoingDisconnectFlow($this->connection), TRUE)
394
			->then(function (Mqtt\Connection $connection) use ($deferred) {
395
				$this->isDisconnecting = FALSE;
396
				$this->isConnected = FALSE;
397
398
				$this->onDisconnect($connection, $this);
399
400
				$deferred->resolve($connection);
401
402
				if ($this->stream !== NULL) {
403
					$this->stream->close();
404
				}
405
			})
406
			->otherwise(function () use ($deferred) {
407
				$this->isDisconnecting = FALSE;
408
				$deferred->reject($this->connection);
409
			});
410
411
		return $deferred->promise();
412
	}
413
414
	/**
415
	 * {@inheritdoc}
416
	 */
417 View Code Duplication
	public function subscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
418
	{
419
		if (!$this->isConnected) {
420
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
421
		}
422
423
		return $this->startFlow(new Mqtt\Flow\OutgoingSubscribeFlow([$subscription], $this->identifierGenerator));
424
	}
425
426
	/**
427
	 * {@inheritdoc}
428
	 */
429 View Code Duplication
	public function unsubscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
430
	{
431
		if (!$this->isConnected) {
432
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
433
		}
434
435
		return $this->startFlow(new Mqtt\Flow\OutgoingUnsubscribeFlow([$subscription], $this->identifierGenerator));
436
	}
437
438
	/**
439
	 * {@inheritdoc}
440
	 */
441
	public function publish(Mqtt\Message $message) : Promise\ExtendedPromiseInterface
442
	{
443
		if (!$this->isConnected) {
444
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
445
		}
446
447
		return $this->startFlow(new Mqtt\Flow\OutgoingPublishFlow($message, $this->identifierGenerator));
448
	}
449
450
	/**
451
	 * {@inheritdoc}
452
	 */
453
	public function publishPeriodically(
454
		int $interval,
455
		Mqtt\Message $message,
456
		callable $generator
457
	) : Promise\ExtendedPromiseInterface {
458
		if (!$this->isConnected) {
459
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
460
		}
461
462
		$deferred = new Promise\Deferred;
463
464
		$this->timer[] = $this->loop->addPeriodicTimer($interval, function () use ($message, $generator, $deferred) {
465
			$this->publish($message->withPayload($generator($message->getTopic())))
466
				->then(
467
					function ($value) use ($deferred) {
468
						$deferred->notify($value);
469
					},
470
					function (Exception $e) use ($deferred) {
471
						$deferred->reject($e);
472
					}
473
				);
474
		});
475
476
		return $deferred->promise();
477
	}
478
479
	/**
480
	 * Establishes a network connection to a server
481
	 *
482
	 * @return Promise\ExtendedPromiseInterface
483
	 *
484
	 * @throws Exceptions\InvalidStateException
485
	 */
486
	private function establishConnection() : Promise\ExtendedPromiseInterface
487
	{
488
		$deferred = new Promise\Deferred;
489
490
		$timer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
491
			$exception = new Exceptions\RuntimeException(sprintf('Connection timed out after %d seconds.', $this->timeout));
492
493
			$deferred->reject($exception);
494
		});
495
496
		$this->connector->connect($this->configuration->getUri())
497
			->always(function () use ($timer) {
498
				$this->loop->cancelTimer($timer);
499
			})
500
			->then(function (Stream\DuplexStreamInterface $stream) use ($deferred) {
501
				$stream->on('data', function ($data) {
502
					$this->handleReceive($data);
503
				});
504
505
				$stream->on('close', function () {
506
					$this->handleClose();
507
				});
508
509
				$stream->on('error', function (Exception $ex) {
510
					$this->handleError($ex);
511
				});
512
513
				$deferred->resolve($stream);
514
			})
515
			->otherwise(function (Exception $ex) use ($deferred) {
516
				$deferred->reject($ex);
517
			});
518
519
		return $deferred->promise();
520
	}
521
522
	/**
523
	 * Registers a new client with the broker
524
	 *
525
	 * @param Mqtt\Connection $connection
526
	 *
527
	 * @return Promise\ExtendedPromiseInterface
528
	 */
529
	private function registerClient(Mqtt\Connection $connection) : Promise\ExtendedPromiseInterface
530
	{
531
		$deferred = new Promise\Deferred;
532
533
		$responseTimer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
534
			$exception = new Exceptions\RuntimeException(sprintf('No response after %d seconds.', $this->timeout));
535
536
			$deferred->reject($exception);
537
		});
538
539
		$this->startFlow(new Mqtt\Flow\OutgoingConnectFlow($connection, $this->identifierGenerator), TRUE)
540
			->always(function () use ($responseTimer) {
541
				$this->loop->cancelTimer($responseTimer);
542
543
			})->then(function (Mqtt\Connection $connection) use ($deferred) {
544
				$this->timer[] = $this->loop->addPeriodicTimer(floor($connection->getKeepAlive() * 0.75), function () {
545
					$this->startFlow(new Mqtt\Flow\OutgoingPingFlow);
546
				});
547
548
				$deferred->resolve($connection);
549
550
			})->otherwise(function (Exception $ex) use ($deferred) {
551
				$deferred->reject($ex);
552
			});
553
554
		return $deferred->promise();
555
	}
556
557
	/**
558
	 * Handles incoming data
559
	 *
560
	 * @param string $data
561
	 *
562
	 * @return void
563
	 */
564
	private function handleReceive(string $data) : void
565
	{
566
		if (!$this->isConnected && !$this->isConnecting) {
567
			return;
568
		}
569
570
		$flowCount = count($this->receivingFlows);
571
572
		$packets = $this->parser->push($data);
573
574
		foreach ($packets as $packet) {
575
			$this->handlePacket($packet);
576
		}
577
578
		if ($flowCount > count($this->receivingFlows)) {
579
			$this->receivingFlows = array_values($this->receivingFlows);
580
		}
581
	}
582
583
	/**
584
	 * Handles an incoming packet
585
	 *
586
	 * @param Mqtt\Packet $packet
587
	 *
588
	 * @return void
589
	 */
590
	private function handlePacket(Mqtt\Packet $packet) : void
591
	{
592
		switch ($packet->getPacketType()) {
593
			case Mqtt\Packet::TYPE_PUBLISH:
594
				/* @var Mqtt\Packet\PublishRequestPacket $packet */
595
				$message = new Mqtt\DefaultMessage(
596
					$packet->getTopic(),
597
					$packet->getPayload(),
598
					$packet->getQosLevel(),
599
					$packet->isRetained(),
600
					$packet->isDuplicate()
601
				);
602
603
				$this->startFlow(new Mqtt\Flow\IncomingPublishFlow($message, $packet->getIdentifier()));
604
				break;
605
606
			case Mqtt\Packet::TYPE_CONNACK:
607
			case Mqtt\Packet::TYPE_PINGRESP:
608
			case Mqtt\Packet::TYPE_SUBACK:
609
			case Mqtt\Packet::TYPE_UNSUBACK:
610
			case Mqtt\Packet::TYPE_PUBREL:
611
			case Mqtt\Packet::TYPE_PUBACK:
612
			case Mqtt\Packet::TYPE_PUBREC:
613
			case Mqtt\Packet::TYPE_PUBCOMP:
614
				$flowFound = FALSE;
615
616
				foreach ($this->receivingFlows as $index => $flow) {
617
					if ($flow->accept($packet)) {
618
						$flowFound = TRUE;
619
620
						unset($this->receivingFlows[$index]);
621
						$this->continueFlow($flow, $packet);
622
623
						break;
624
					}
625
				}
626
627
				if (!$flowFound) {
628
					$ex = new Exceptions\LogicException(sprintf('Received unexpected packet of type %d.', $packet->getPacketType()));
629
630
					$this->onWarning($ex, $this);
631
				}
632
				break;
633
634
			default:
635
				$ex = new Exceptions\LogicException(sprintf('Cannot handle packet of type %d.', $packet->getPacketType()));
636
637
				$this->onWarning($ex, $this);
638
		}
639
	}
640
641
	/**
642
	 * Handles outgoing packets
643
	 *
644
	 * @return void
645
	 */
646
	private function handleSend() : void
647
	{
648
		$flow = NULL;
649
650
		if ($this->writtenFlow !== NULL) {
651
			$flow = $this->writtenFlow;
652
			$this->writtenFlow = NULL;
653
		}
654
655
		if (count($this->sendingFlows) > 0) {
656
			$this->writtenFlow = array_shift($this->sendingFlows);
657
			$this->stream->write($this->writtenFlow->getPacket());
658
		}
659
660
		if ($flow !== NULL) {
661
			if ($flow->isFinished()) {
662
				$this->loop->futureTick(function () use ($flow) {
663
					$this->finishFlow($flow);
664
				});
665
666
			} else {
667
				$this->receivingFlows[] = $flow;
668
			}
669
		}
670
	}
671
672
	/**
673
	 * Handles closing of the stream
674
	 *
675
	 * @return void
676
	 */
677
	private function handleClose() : void
678
	{
679
		foreach ($this->timer as $timer) {
680
			$this->loop->cancelTimer($timer);
681
		}
682
683
		$connection = $this->connection;
684
685
		$this->isConnecting = FALSE;
686
		$this->isDisconnecting = FALSE;
687
		$this->isConnected = FALSE;
688
		$this->connection = NULL;
689
		$this->stream = NULL;
690
691
		if ($connection !== NULL) {
692
			$this->onClose($connection, $this);
693
		}
694
	}
695
696
	/**
697
	 * Handles errors of the stream
698
	 *
699
	 * @param Exception $ex
700
	 *
701
	 * @return void
702
	 */
703
	private function handleError(Exception $ex) : void
704
	{
705
		$this->onError($ex, $this);
706
	}
707
708
	/**
709
	 * Starts the given flow
710
	 *
711
	 * @param Mqtt\Flow $flow
712
	 * @param bool $isSilent
713
	 *
714
	 * @return Promise\ExtendedPromiseInterface
715
	 */
716
	private function startFlow(Mqtt\Flow $flow, bool $isSilent = FALSE) : Promise\ExtendedPromiseInterface
717
	{
718
		try {
719
			$packet = $flow->start();
720
721
		} catch (Exception $ex) {
722
			$this->onError($ex, $this);
723
724
			return new Promise\RejectedPromise($ex);
725
		}
726
727
		$deferred = new Promise\Deferred;
728
		$internalFlow = new Flow\Envelope($flow, $deferred, $packet, $isSilent);
729
730 View Code Duplication
		if ($packet !== NULL) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
731
			if ($this->writtenFlow !== NULL) {
732
				$this->sendingFlows[] = $internalFlow;
733
734
			} else {
735
				$this->stream->write($packet);
736
				$this->writtenFlow = $internalFlow;
737
738
				$this->handleSend();
739
			}
740
741
		} else {
742
			$this->loop->futureTick(function () use ($internalFlow) {
743
				$this->finishFlow($internalFlow);
744
			});
745
		}
746
747
		return $deferred->promise();
748
	}
749
750
	/**
751
	 * Continues the given flow
752
	 *
753
	 * @param Flow\Envelope $flow
754
	 * @param Mqtt\Packet $packet
755
	 *
756
	 * @return void
757
	 */
758
	private function continueFlow(Flow\Envelope $flow, Mqtt\Packet $packet) : void
759
	{
760
		try {
761
			$response = $flow->next($packet);
762
763
		} catch (Exception $ex) {
764
			$this->onError($ex, $this);
765
766
			return;
767
		}
768
769 View Code Duplication
		if ($response !== NULL) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
770
			if ($this->writtenFlow !== NULL) {
771
				$this->sendingFlows[] = $flow;
772
773
			} else {
774
				$this->stream->write($response);
775
				$this->writtenFlow = $flow;
776
777
				$this->handleSend();
778
			}
779
780
		} elseif ($flow->isFinished()) {
781
			$this->loop->futureTick(function () use ($flow) {
782
				$this->finishFlow($flow);
783
			});
784
		}
785
	}
786
787
	/**
788
	 * Finishes the given flow
789
	 *
790
	 * @param Flow\Envelope $flow
791
	 *
792
	 * @return void
793
	 */
794
	private function finishFlow(Flow\Envelope $flow) : void
795
	{
796
		if ($flow->isSuccess()) {
797
			if (!$flow->isSilent()) {
798
				switch ($flow->getCode()) {
799
					case 'ping':
800
						$this->onPing($this);
801
						break;
802
803
					case 'pong':
804
						$this->onPong($this);
805
						break;
806
807
					case 'connect':
808
						$this->onConnect($flow->getResult(), $this);
809
						break;
810
811
					case 'disconnect':
812
						$this->onDisconnect($flow->getResult(), $this);
813
						break;
814
815
					case 'publish':
816
						$this->onPublish($flow->getResult(), $this);
817
						break;
818
819
					case 'subscribe':
820
						$this->onSubscribe($flow->getResult(), $this);
821
						break;
822
823
					case 'unsubscribe':
824
						$this->onUnsubscribe($flow->getResult(), $this);
825
						break;
826
827
					case 'message':
828
						/** @var Mqtt\Message $message */
829
						$message = $flow->getResult();
830
831
						if ($this->lastMessage === NULL) {
832
							$this->lastMessage = $flow->getResult();
833
834
						} elseif (
835
							$this->lastMessage->getTopic() === $message->getTopic()
836
							&& $this->lastMessage->getPayload() === $message->getPayload()
837
						) {
838
							break;
839
						}
840
841
						$this->lastMessage = $message;
842
843
						$this->onMessage($message, $this);
844
						break;
845
				}
846
			}
847
848
			$flow->getDeferred()->resolve($flow->getResult());
849
850
		} else {
851
			$ex = new Exceptions\RuntimeException($flow->getErrorMessage());
852
			$this->onWarning($ex, $this);
853
854
			$flow->getDeferred()->reject($ex);
855
		}
856
	}
857
858
	/**
859
	 * @return void
860
	 */
861
	private function createConnector() : void
862
	{
863
		$this->connector = new Socket\Connector($this->loop);
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\Connector($this->loop) of type object<React\Socket\Connector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
864
865
		if ($this->configuration->isSSLEnabled()) {
866
			$this->connector = new Socket\TcpConnector($this->loop);
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\TcpConnector($this->loop) of type object<React\Socket\TcpConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
867
			$this->connector = new Socket\SecureConnector($this->connector, $this->loop, $this->configuration->getSSLConfiguration());
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\Secure...>getSSLConfiguration()) of type object<React\Socket\SecureConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
868
		}
869
	}
870
}
871