Passed
Push — master ( 958a85...6386e9 )
by Adam
50s queued 10s
created

src/IPub/MQTTClient/Client/Client.php (4 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
/**
3
 * Client.php
4
 *
5
 * @copyright      More in license.md
6
 * @license        http://www.ipublikuj.eu
7
 * @author         Adam Kadlec http://www.ipublikuj.eu
8
 * @package        iPublikuj:MQTTClient!
9
 * @subpackage     Client
10
 * @since          1.0.0
11
 *
12
 * @date           12.03.17
13
 */
14
15
declare(strict_types = 1);
16
17
namespace IPub\MQTTClient\Client;
18
19
use Nette;
20
21
use React\EventLoop;
22
use React\Dns;
23
use React\Promise;
24
use React\Socket;
25
use React\Stream;
26
27
use BinSoul\Net\Mqtt;
28
29
use IPub\MQTTClient\Exceptions;
30
use IPub\MQTTClient\Flow;
31
32
/**
33
 * Connection client
34
 *
35
 * @package        iPublikuj:MQTTClient!
36
 * @subpackage     Client
37
 *
38
 * @author         Adam Kadlec <[email protected]>
39
 *
40
 * @method onStart()
41
 * @method onOpen(Mqtt\Connection $connection, IClient $client)
42
 * @method onConnect(Mqtt\Connection $connection, IClient $client)
43
 * @method onDisconnect(Mqtt\Connection $connection, IClient $client)
44
 * @method onClose(Mqtt\Connection $connection, IClient $client)
45
 * @method onPing(IClient $client)
46
 * @method onPong(IClient $client)
47
 * @method onPublish(Mqtt\Message $message, IClient $client)
48
 * @method onSubscribe(Mqtt\Subscription $subscription, IClient $client)
49
 * @method onUnsubscribe(Mqtt\Subscription $subscription, IClient $client)
50
 * @method onMessage(Mqtt\Message $message, IClient $client)
51
 * @method onWarning(\Exception $ex, IClient $client)
52
 * @method onError(\Exception $ex, IClient $client)
53
 */
54 1
final class Client implements IClient
55
{
56
	/**
57
	 * Implement nette smart magic
58
	 */
59 1
	use Nette\SmartObject;
60
61
	/**
62
	 * @var \Closure
63
	 */
64
	public $onStart = [];
65
66
	/**
67
	 * @var \Closure
68
	 */
69
	public $onOpen = [];
70
71
	/**
72
	 * @var \Closure
73
	 */
74
	public $onConnect = [];
75
76
	/**
77
	 * @var \Closure
78
	 */
79
	public $onDisconnect = [];
80
81
	/**
82
	 * @var \Closure
83
	 */
84
	public $onClose = [];
85
86
	/**
87
	 * @var \Closure
88
	 */
89
	public $onPing = [];
90
91
	/**
92
	 * @var \Closure
93
	 */
94
	public $onPong = [];
95
96
	/**
97
	 * @var \Closure
98
	 */
99
	public $onPublish = [];
100
101
	/**
102
	 * @var \Closure
103
	 */
104
	public $onSubscribe = [];
105
106
	/**
107
	 * @var \Closure
108
	 */
109
	public $onUnsubscribe = [];
110
111
	/**
112
	 * @var \Closure
113
	 */
114
	public $onMessage = [];
115
116
	/**
117
	 * @var \Closure
118
	 */
119
	public $onWarning = [];
120
121
	/**
122
	 * @var \Closure
123
	 */
124
	public $onError = [];
125
126
	/**
127
	 * @var EventLoop\LoopInterface
128
	 */
129
	private $loop;
130
131
	/**
132
	 * @var Configuration
133
	 */
134
	private $configuration;
135
136
	/**
137
	 * @var Socket\ConnectorInterface
138
	 */
139
	private $connector;
140
141
	/**
142
	 * @var Socket\ConnectionInterface|NULL
143
	 */
144
	private $stream = NULL;
145
146
	/**
147
	 * @var Mqtt\Connection
148
	 */
149
	private $connection;
150
151
	/**
152
	 * @var Mqtt\StreamParser
153
	 */
154
	private $parser;
155
156
	/**
157
	 * @var Mqtt\IdentifierGenerator
158
	 */
159
	private $identifierGenerator;
160
161
	/**
162
	 * @var bool
163
	 */
164
	private $isConnected = FALSE;
165
166
	/**
167
	 * @var bool
168
	 */
169
	private $isConnecting = FALSE;
170
171
	/**
172
	 * @var bool
173
	 */
174
	private $isDisconnecting = FALSE;
175
176
	/**
177
	 * @var string
178
	 */
179
	private $host;
180
181
	/**
182
	 * @var int
183
	 */
184
	private $port;
185
186
	/**
187
	 * @var int
188
	 */
189
	private $timeout = 5;
190
191
	/**
192
	 * @var Flow\Envelope[]
193
	 */
194
	private $receivingFlows = [];
195
196
	/**
197
	 * @var Flow\Envelope[]
198
	 */
199
	private $sendingFlows = [];
200
201
	/**
202
	 * @var Flow\Envelope
203
	 */
204
	private $writtenFlow;
205
206
	/**
207
	 * @var EventLoop\Timer\TimerInterface[]
208
	 */
209
	private $timer = [];
210
211
	/**
212
	 * @var Mqtt\Message|NULL
213
	 */
214
	private $lastMessage = NULL;
215
216
	/**
217
	 * @param EventLoop\LoopInterface $eventLoop
218
	 * @param Configuration $configuration
219
	 * @param Mqtt\IdentifierGenerator|NULL $identifierGenerator
220
	 * @param Mqtt\StreamParser|NULL $parser
221
	 */
222
	public function __construct(
223
		EventLoop\LoopInterface $eventLoop,
224
		Configuration $configuration,
225
		Mqtt\IdentifierGenerator $identifierGenerator = NULL,
226
		Mqtt\StreamParser $parser = NULL
227
	) {
228 1
		$this->loop = $eventLoop;
229 1
		$this->configuration = $configuration;
230
231 1
		$this->createConnector();
232
233 1
		$this->parser = $parser;
234
235 1
		if ($this->parser === NULL) {
236 1
			$this->parser = new Mqtt\StreamParser;
237
		}
238
239 1
		$this->parser->onError(function (\Exception $ex) {
240
			$this->onError($ex, $this);
241 1
		});
242
243 1
		$this->identifierGenerator = $identifierGenerator;
244
245 1
		if ($this->identifierGenerator === NULL) {
246 1
			$this->identifierGenerator = new Mqtt\DefaultIdentifierGenerator;
0 ignored issues
show
Documentation Bug introduced by
It seems like new \BinSoul\Net\Mqtt\DefaultIdentifierGenerator() of type object<BinSoul\Net\Mqtt\...ultIdentifierGenerator> is incompatible with the declared type object<BinSoul\Net\Mqtt\IdentifierGenerator> of property $identifierGenerator.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
247
		}
248 1
	}
249
250
	/**
251
	 * {@inheritdoc}
252
	 */
253
	public function setLoop(EventLoop\LoopInterface $loop) : void
254
	{
255
		if (!$this->isConnected && !$this->isConnecting) {
256
			$this->loop = $loop;
257
258
			$this->createConnector();
259
260
		} else {
261
			throw new Exceptions\LogicException('Connection is already established. React event loop could not be changed.');
262
		}
263
	}
264
265
	/**
266
	 * {@inheritdoc}
267
	 */
268
	public function getLoop() : EventLoop\LoopInterface
269
	{
270
		return $this->loop;
271
	}
272
273
	/**
274
	 * {@inheritdoc}
275
	 */
276
	public function setConfiguration(Configuration $configuration) : void
277
	{
278
		if ($this->isConnected() || $this->isConnecting) {
279
			throw new Exceptions\InvalidStateException('Client is connecting or connected to the broker, therefore configuration could not be changed.');
280
		}
281
282
		$this->configuration = $configuration;
283
	}
284
285
	/**
286
	 * {@inheritdoc}
287
	 */
288
	public function getUri() : string
289
	{
290
		return $this->configuration->getUri();
291
	}
292
293
	/**
294
	 * {@inheritdoc}
295
	 */
296
	public function getPort() : int
297
	{
298
		return $this->configuration->getPort();
299
	}
300
301
	/**
302
	 * {@inheritdoc}
303
	 */
304
	public function isConnected() : bool
305
	{
306
		return $this->isConnected;
307
	}
308
309
	/**
310
	 * {@inheritdoc}
311
	 */
312
	public function connect() : Promise\ExtendedPromiseInterface
313
	{
314
		if ($this->isConnected || $this->isConnecting) {
315
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is already connected.'));
316
		}
317
318
		$this->onStart();
319
320
		$this->isConnecting = TRUE;
321
		$this->isConnected = FALSE;
322
323
		$connection = $this->configuration->getConnection();
324
325
		if ($connection->getClientID() === '') {
326
			$connection = $connection->withClientID($this->identifierGenerator->generateClientID());
327
		}
328
329
		$deferred = new Promise\Deferred;
330
331
		$this->establishConnection()
332
			->then(function (Socket\ConnectionInterface $stream) use ($connection, $deferred) {
333
				$this->stream = $stream;
334
335
				$this->onOpen($connection, $this);
336
337
				$this->registerClient($connection)
338
					->then(function (Mqtt\Connection $connection) use ($deferred) {
339
						$this->isConnecting = FALSE;
340
						$this->isConnected = TRUE;
341
342
						$this->connection = $connection;
343
344
						$this->onConnect($connection, $this);
345
346
						$deferred->resolve($this->connection);
347
					})
348
					->otherwise(function (\Exception $ex) use ($stream, $deferred, $connection) {
349
						$this->isConnecting = FALSE;
350
351
						$this->onError($ex, $this);
352
353
						$deferred->reject($ex);
354
355
						if ($this->stream !== NULL) {
356
							$this->stream->close();
357
						}
358
359
						$this->onClose($connection, $this);
360
					});
361
			})
362
			->otherwise(function (\Exception $ex) use ($deferred) {
363
				$this->isConnecting = FALSE;
364
365
				$this->onError($ex, $this);
366
367
				$deferred->reject($ex);
368
			});
369
370
		return $deferred->promise();
371
	}
372
373
	/**
374
	 * {@inheritdoc}
375
	 */
376
	public function disconnect() : Promise\ExtendedPromiseInterface
377
	{
378
		if (!$this->isConnected || $this->isDisconnecting) {
379
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
380
		}
381
382
		$this->isDisconnecting = TRUE;
383
384
		$deferred = new Promise\Deferred;
385
386
		$this->startFlow(new Mqtt\Flow\OutgoingDisconnectFlow($this->connection), TRUE)
387
			->then(function (Mqtt\Connection $connection) use ($deferred) {
388
				$this->isDisconnecting = FALSE;
389
				$this->isConnected = FALSE;
390
391
				$this->onDisconnect($connection, $this);
392
393
				$deferred->resolve($connection);
394
395
				if ($this->stream !== NULL) {
396
					$this->stream->close();
397
				}
398
			})
399
			->otherwise(function () use ($deferred) {
400
				$this->isDisconnecting = FALSE;
401
				$deferred->reject($this->connection);
402
			});
403
404
		return $deferred->promise();
405
	}
406
407
	/**
408
	 * {@inheritdoc}
409
	 */
410 View Code Duplication
	public function subscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
411
	{
412
		if (!$this->isConnected) {
413
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
414
		}
415
416
		return $this->startFlow(new Mqtt\Flow\OutgoingSubscribeFlow([$subscription], $this->identifierGenerator));
417
	}
418
419
	/**
420
	 * {@inheritdoc}
421
	 */
422 View Code Duplication
	public function unsubscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
423
	{
424
		if (!$this->isConnected) {
425
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
426
		}
427
428
		return $this->startFlow(new Mqtt\Flow\OutgoingUnsubscribeFlow([$subscription], $this->identifierGenerator));
429
	}
430
431
	/**
432
	 * {@inheritdoc}
433
	 */
434
	public function publish(Mqtt\Message $message) : Promise\ExtendedPromiseInterface
435
	{
436
		if (!$this->isConnected) {
437
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
438
		}
439
440
		return $this->startFlow(new Mqtt\Flow\OutgoingPublishFlow($message, $this->identifierGenerator));
441
	}
442
443
	/**
444
	 * {@inheritdoc}
445
	 */
446
	public function publishPeriodically(
447
		int $interval,
448
		Mqtt\Message $message,
449
		callable $generator
450
	) : Promise\ExtendedPromiseInterface {
451
		if (!$this->isConnected) {
452
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
453
		}
454
455
		$deferred = new Promise\Deferred;
456
457
		$this->timer[] = $this->loop->addPeriodicTimer($interval, function () use ($message, $generator, $deferred) {
458
			$this->publish($message->withPayload($generator($message->getTopic())))
459
				->then(
460
					function ($value) use ($deferred) {
461
						$deferred->notify($value);
462
					},
463
					function (\Exception $e) use ($deferred) {
464
						$deferred->reject($e);
465
					}
466
				);
467
		});
468
469
		return $deferred->promise();
470
	}
471
472
	/**
473
	 * Establishes a network connection to a server
474
	 *
475
	 * @return Promise\ExtendedPromiseInterface
476
	 */
477
	private function establishConnection() : Promise\ExtendedPromiseInterface
478
	{
479
		$deferred = new Promise\Deferred;
480
481
		$timer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
482
			$exception = new Exceptions\RuntimeException(sprintf('Connection timed out after %d seconds.', $this->timeout));
483
484
			$deferred->reject($exception);
485
		});
486
487
		$this->connector->connect($this->configuration->getUri())
488
			->always(function () use ($timer) {
489
				$this->loop->cancelTimer($timer);
490
			})
491
			->then(function (Stream\DuplexStreamInterface $stream) use ($deferred) {
492
				$stream->on('data', function ($data) {
493
					$this->handleReceive($data);
494
				});
495
496
				$stream->on('close', function () {
497
					$this->handleClose();
498
				});
499
500
				$stream->on('error', function (\Exception $ex) {
501
					$this->handleError($ex);
502
				});
503
504
				$deferred->resolve($stream);
505
			})
506
			->otherwise(function (\Exception $ex) use ($deferred) {
507
				$deferred->reject($ex);
508
			});
509
510
		return $deferred->promise();
511
	}
512
513
	/**
514
	 * Registers a new client with the broker
515
	 *
516
	 * @param Mqtt\Connection $connection
517
	 *
518
	 * @return Promise\ExtendedPromiseInterface
519
	 */
520
	private function registerClient(Mqtt\Connection $connection) : Promise\ExtendedPromiseInterface
521
	{
522
		$deferred = new Promise\Deferred;
523
524
		$responseTimer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
525
			$exception = new Exceptions\RuntimeException(sprintf('No response after %d seconds.', $this->timeout));
526
527
			$deferred->reject($exception);
528
		});
529
530
		$this->startFlow(new Mqtt\Flow\OutgoingConnectFlow($connection, $this->identifierGenerator), TRUE)
531
			->always(function () use ($responseTimer) {
532
				$this->loop->cancelTimer($responseTimer);
533
534
			})->then(function (Mqtt\Connection $connection) use ($deferred) {
535
				$this->timer[] = $this->loop->addPeriodicTimer(floor($connection->getKeepAlive() * 0.75), function () {
536
					$this->startFlow(new Mqtt\Flow\OutgoingPingFlow);
537
				});
538
539
				$deferred->resolve($connection);
540
541
			})->otherwise(function (\Exception $ex) use ($deferred) {
542
				$deferred->reject($ex);
543
			});
544
545
		return $deferred->promise();
546
	}
547
548
	/**
549
	 * Handles incoming data
550
	 *
551
	 * @param string $data
552
	 *
553
	 * @return void
554
	 */
555
	private function handleReceive(string $data) : void
556
	{
557
		if (!$this->isConnected && !$this->isConnecting) {
558
			return;
559
		}
560
561
		$flowCount = count($this->receivingFlows);
562
563
		$packets = $this->parser->push($data);
564
565
		foreach ($packets as $packet) {
566
			$this->handlePacket($packet);
567
		}
568
569
		if ($flowCount > count($this->receivingFlows)) {
570
			$this->receivingFlows = array_values($this->receivingFlows);
571
		}
572
	}
573
574
	/**
575
	 * Handles an incoming packet
576
	 *
577
	 * @param Mqtt\Packet $packet
578
	 *
579
	 * @return void
580
	 */
581
	private function handlePacket(Mqtt\Packet $packet) : void
582
	{
583
		switch ($packet->getPacketType()) {
584
			case Mqtt\Packet::TYPE_PUBLISH:
585
				/* @var Mqtt\Packet\PublishRequestPacket $packet */
586
				$message = new Mqtt\DefaultMessage(
587
					$packet->getTopic(),
588
					$packet->getPayload(),
589
					$packet->getQosLevel(),
590
					$packet->isRetained(),
591
					$packet->isDuplicate()
592
				);
593
594
				$this->startFlow(new Mqtt\Flow\IncomingPublishFlow($message, $packet->getIdentifier()));
595
				break;
596
597
			case Mqtt\Packet::TYPE_CONNACK:
598
			case Mqtt\Packet::TYPE_PINGRESP:
599
			case Mqtt\Packet::TYPE_SUBACK:
600
			case Mqtt\Packet::TYPE_UNSUBACK:
601
			case Mqtt\Packet::TYPE_PUBREL:
602
			case Mqtt\Packet::TYPE_PUBACK:
603
			case Mqtt\Packet::TYPE_PUBREC:
604
			case Mqtt\Packet::TYPE_PUBCOMP:
605
				$flowFound = FALSE;
606
607
				foreach ($this->receivingFlows as $index => $flow) {
608
					if ($flow->accept($packet)) {
609
						$flowFound = TRUE;
610
611
						unset($this->receivingFlows[$index]);
612
						$this->continueFlow($flow, $packet);
613
614
						break;
615
					}
616
				}
617
618
				if (!$flowFound) {
619
					$ex = new Exceptions\LogicException(sprintf('Received unexpected packet of type %d.', $packet->getPacketType()));
620
621
					$this->onWarning($ex, $this);
622
				}
623
				break;
624
625
			default:
626
				$ex = new Exceptions\LogicException(sprintf('Cannot handle packet of type %d.', $packet->getPacketType()));
627
628
				$this->onWarning($ex, $this);
629
		}
630
	}
631
632
	/**
633
	 * Handles outgoing packets
634
	 *
635
	 * @return void
636
	 */
637
	private function handleSend() : void
638
	{
639
		$flow = NULL;
640
641
		if ($this->writtenFlow !== NULL) {
642
			$flow = $this->writtenFlow;
643
			$this->writtenFlow = NULL;
644
		}
645
646
		if (count($this->sendingFlows) > 0) {
647
			$this->writtenFlow = array_shift($this->sendingFlows);
648
			$this->stream->write($this->writtenFlow->getPacket());
649
		}
650
651
		if ($flow !== NULL) {
652
			if ($flow->isFinished()) {
653
				$this->loop->futureTick(function () use ($flow) {
654
					$this->finishFlow($flow);
655
				});
656
657
			} else {
658
				$this->receivingFlows[] = $flow;
659
			}
660
		}
661
	}
662
663
	/**
664
	 * Handles closing of the stream
665
	 *
666
	 * @return void
667
	 */
668
	private function handleClose() : void
669
	{
670
		foreach ($this->timer as $timer) {
671
			$this->loop->cancelTimer($timer);
672
		}
673
674
		$connection = $this->connection;
675
676
		$this->isConnecting = FALSE;
677
		$this->isDisconnecting = FALSE;
678
		$this->isConnected = FALSE;
679
		$this->connection = NULL;
680
		$this->stream = NULL;
681
682
		if ($connection !== NULL) {
683
			$this->onClose($connection, $this);
684
		}
685
	}
686
687
	/**
688
	 * Handles errors of the stream
689
	 *
690
	 * @param \Exception $ex
691
	 *
692
	 * @return void
693
	 */
694
	private function handleError(\Exception $ex) : void
695
	{
696
		$this->onError($ex, $this);
697
	}
698
699
	/**
700
	 * Starts the given flow
701
	 *
702
	 * @param Mqtt\Flow $flow
703
	 * @param bool $isSilent
704
	 *
705
	 * @return Promise\ExtendedPromiseInterface
706
	 */
707
	private function startFlow(Mqtt\Flow $flow, bool $isSilent = FALSE) : Promise\ExtendedPromiseInterface
708
	{
709
		try {
710
			$packet = $flow->start();
711
712
		} catch (\Exception $ex) {
713
			$this->onError($ex, $this);
714
715
			return new Promise\RejectedPromise($ex);
716
		}
717
718
		$deferred = new Promise\Deferred;
719
		$internalFlow = new Flow\Envelope($flow, $deferred, $packet, $isSilent);
720
721 View Code Duplication
		if ($packet !== NULL) {
722
			if ($this->writtenFlow !== NULL) {
723
				$this->sendingFlows[] = $internalFlow;
724
725
			} else {
726
				$this->stream->write($packet);
727
				$this->writtenFlow = $internalFlow;
728
729
				$this->handleSend();
730
			}
731
732
		} else {
733
			$this->loop->futureTick(function () use ($internalFlow) {
734
				$this->finishFlow($internalFlow);
735
			});
736
		}
737
738
		return $deferred->promise();
739
	}
740
741
	/**
742
	 * Continues the given flow
743
	 *
744
	 * @param Flow\Envelope $flow
745
	 * @param Mqtt\Packet $packet
746
	 *
747
	 * @return void
748
	 */
749
	private function continueFlow(Flow\Envelope $flow, Mqtt\Packet $packet) : void
750
	{
751
		try {
752
			$response = $flow->next($packet);
753
754
		} catch (\Exception $ex) {
755
			$this->onError($ex, $this);
756
757
			return;
758
		}
759
760 View Code Duplication
		if ($response !== NULL) {
761
			if ($this->writtenFlow !== NULL) {
762
				$this->sendingFlows[] = $flow;
763
764
			} else {
765
				$this->stream->write($response);
766
				$this->writtenFlow = $flow;
767
768
				$this->handleSend();
769
			}
770
771
		} elseif ($flow->isFinished()) {
772
			$this->loop->futureTick(function () use ($flow) {
773
				$this->finishFlow($flow);
774
			});
775
		}
776
	}
777
778
	/**
779
	 * Finishes the given flow
780
	 *
781
	 * @param Flow\Envelope $flow
782
	 *
783
	 * @return void
784
	 */
785
	private function finishFlow(Flow\Envelope $flow) : void
786
	{
787
		if ($flow->isSuccess()) {
788
			if (!$flow->isSilent()) {
789
				switch ($flow->getCode()) {
790
					case 'ping':
791
						$this->onPing($this);
792
						break;
793
794
					case 'pong':
795
						$this->onPong($this);
796
						break;
797
798
					case 'connect':
799
						$this->onConnect($flow->getResult(), $this);
800
						break;
801
802
					case 'disconnect':
803
						$this->onDisconnect($flow->getResult(), $this);
804
						break;
805
806
					case 'publish':
807
						$this->onPublish($flow->getResult(), $this);
808
						break;
809
810
					case 'subscribe':
811
						$this->onSubscribe($flow->getResult(), $this);
812
						break;
813
814
					case 'unsubscribe':
815
						$this->onUnsubscribe($flow->getResult(), $this);
816
						break;
817
818
					case 'message':
819
						/** @var Mqtt\Message $message */
820
						$message = $flow->getResult();
821
822
						if ($this->lastMessage === NULL) {
823
							$this->lastMessage = $flow->getResult();
824
825
						} elseif (
826
							$this->lastMessage->getTopic() === $message->getTopic()
827
							&& $this->lastMessage->getPayload() === $message->getPayload()
828
						) {
829
							break;
830
						}
831
832
						$this->lastMessage = $message;
833
834
						$this->onMessage($message, $this);
835
						break;
836
				}
837
			}
838
839
			$flow->getDeferred()->resolve($flow->getResult());
840
841
		} else {
842
			$ex = new Exceptions\RuntimeException($flow->getErrorMessage());
843
			$this->onWarning($ex, $this);
844
845
			$flow->getDeferred()->reject($ex);
846
		}
847
	}
848
849
	/**
850
	 * @return void
851
	 */
852
	private function createConnector() : void
853
	{
854 1
		$this->connector = new Socket\TcpConnector($this->loop);
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\TcpConnector($this->loop) of type object<React\Socket\TcpConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
855
856 1
		if ($this->configuration->isDNSEnabled()) {
857 1
			$dnsResolverFactory = new Dns\Resolver\Factory;
858
859 1
			$this->connector = new Socket\DnsConnector($this->connector, $dnsResolverFactory->createCached($this->configuration->getDNSAddress(), $this->loop));
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\DnsCon...ddress(), $this->loop)) of type object<React\Socket\DnsConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
860
		}
861
862 1
		if ($this->configuration->isSSLEnabled()) {
863
			$this->connector = new Socket\SecureConnector($this->connector, $this->loop, $this->configuration->getSSLConfiguration());
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\Secure...>getSSLConfiguration()) of type object<React\Socket\SecureConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
864
		}
865 1
	}
866
}
867