Passed
Push — master ( 12828b...dadfd5 )
by Adam
02:24
created

src/IPub/MQTTClient/Client/Client.php (1 issue)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
/**
3
 * Client.php
4
 *
5
 * @copyright      More in license.md
6
 * @license        http://www.ipublikuj.eu
7
 * @author         Adam Kadlec http://www.ipublikuj.eu
8
 * @package        iPublikuj:MQTTClient!
9
 * @subpackage     Client
10
 * @since          1.0.0
11
 *
12
 * @date           12.03.17
13
 */
14
15
declare(strict_types = 1);
16
17
namespace IPub\MQTTClient\Client;
18
19
use Closure;
20
use Exception;
21
22
use Nette;
23
24
use React\EventLoop;
25
use React\Promise;
26
use React\Socket;
27
use React\Stream;
28
29
use BinSoul\Net\Mqtt;
30
31
use IPub\MQTTClient\Configuration;
32
use IPub\MQTTClient\Exceptions;
33
use IPub\MQTTClient\Flow;
34
35
/**
36
 * Connection client
37
 *
38
 * @package        iPublikuj:MQTTClient!
39
 * @subpackage     Client
40
 *
41
 * @author         Adam Kadlec <[email protected]>
42
 *
43
 * @method onStart()
44
 * @method onOpen(Mqtt\Connection $connection, IClient $client)
45
 * @method onConnect(Mqtt\Connection $connection, IClient $client)
46
 * @method onDisconnect(Mqtt\Connection $connection, IClient $client)
47
 * @method onClose(Mqtt\Connection $connection, IClient $client)
48
 * @method onPing(IClient $client)
49
 * @method onPong(IClient $client)
50
 * @method onPublish(Mqtt\Message $message, IClient $client)
51
 * @method onSubscribe(Mqtt\Subscription $subscription, IClient $client)
52
 * @method onUnsubscribe(Mqtt\Subscription $subscription, IClient $client)
53
 * @method onMessage(Mqtt\Message $message, IClient $client)
54
 * @method onWarning(Exception $ex, IClient $client)
55
 * @method onError(Exception $ex, IClient $client)
56
 */
57 1
final class Client implements IClient
58
{
59
	/**
60
	 * Implement nette smart magic
61
	 */
62 1
	use Nette\SmartObject;
63
64
	/**
65
	 * @var Closure[]
66
	 */
67
	public $onStart = [];
68
69
	/**
70
	 * @var Closure[]
71
	 */
72
	public $onOpen = [];
73
74
	/**
75
	 * @var Closure[]
76
	 */
77
	public $onConnect = [];
78
79
	/**
80
	 * @var Closure[]
81
	 */
82
	public $onDisconnect = [];
83
84
	/**
85
	 * @var Closure[]
86
	 */
87
	public $onClose = [];
88
89
	/**
90
	 * @var Closure[]
91
	 */
92
	public $onPing = [];
93
94
	/**
95
	 * @var Closure[]
96
	 */
97
	public $onPong = [];
98
99
	/**
100
	 * @var Closure[]
101
	 */
102
	public $onPublish = [];
103
104
	/**
105
	 * @var Closure[]
106
	 */
107
	public $onSubscribe = [];
108
109
	/**
110
	 * @var Closure[]
111
	 */
112
	public $onUnsubscribe = [];
113
114
	/**
115
	 * @var Closure[]
116
	 */
117
	public $onMessage = [];
118
119
	/**
120
	 * @var Closure[]
121
	 */
122
	public $onWarning = [];
123
124
	/**
125
	 * @var Closure[]
126
	 */
127
	public $onError = [];
128
129
	/**
130
	 * @var EventLoop\LoopInterface
131
	 */
132
	private $loop;
133
134
	/**
135
	 * @var Configuration\Broker
136
	 */
137
	private $configuration;
138
139
	/**
140
	 * @var Socket\ConnectorInterface
141
	 */
142
	private $connector;
143
144
	/**
145
	 * @var Socket\ConnectionInterface|NULL
146
	 */
147
	private $stream = NULL;
148
149
	/**
150
	 * @var Mqtt\Connection
151
	 */
152
	private $connection;
153
154
	/**
155
	 * @var Mqtt\StreamParser
156
	 */
157
	private $parser;
158
159
	/**
160
	 * @var Mqtt\IdentifierGenerator
161
	 */
162
	private $identifierGenerator;
163
164
	/**
165
	 * @var bool
166
	 */
167
	private $isConnected = FALSE;
168
169
	/**
170
	 * @var bool
171
	 */
172
	private $isConnecting = FALSE;
173
174
	/**
175
	 * @var bool
176
	 */
177
	private $isDisconnecting = FALSE;
178
179
	/**
180
	 * @var string
181
	 */
182
	private $host;
183
184
	/**
185
	 * @var int
186
	 */
187
	private $port;
188
189
	/**
190
	 * @var int
191
	 */
192
	private $timeout = 5;
193
194
	/**
195
	 * @var Flow\Envelope[]
196
	 */
197
	private $receivingFlows = [];
198
199
	/**
200
	 * @var Flow\Envelope[]
201
	 */
202
	private $sendingFlows = [];
203
204
	/**
205
	 * @var Flow\Envelope
206
	 */
207
	private $writtenFlow;
208
209
	/**
210
	 * @var EventLoop\TimerInterface[]
211
	 */
212
	private $timer = [];
213
214
	/**
215
	 * @var Mqtt\Message|NULL
216
	 */
217
	private $lastMessage = NULL;
218
219
	/**
220
	 * @param EventLoop\LoopInterface $eventLoop
221
	 * @param Configuration\Broker $configuration
222
	 * @param Mqtt\IdentifierGenerator|NULL $identifierGenerator
223
	 * @param Mqtt\StreamParser|NULL $parser
224
	 */
225
	public function __construct(
226
		EventLoop\LoopInterface $eventLoop,
227
		Configuration\Broker $configuration,
228
		Mqtt\IdentifierGenerator $identifierGenerator = NULL,
229
		Mqtt\StreamParser $parser = NULL
230
	) {
231 1
		$this->loop = $eventLoop;
232 1
		$this->configuration = $configuration;
233
234 1
		$this->parser = $parser;
235
236 1
		if ($this->parser === NULL) {
237 1
			$this->parser = new Mqtt\StreamParser;
238
		}
239
240 1
		$this->parser->onError(function (Exception $ex) {
241
			$this->onError($ex, $this);
242 1
		});
243
244 1
		$this->identifierGenerator = $identifierGenerator;
245
246 1
		if ($this->identifierGenerator === NULL) {
247 1
			$this->identifierGenerator = new Mqtt\DefaultIdentifierGenerator;
248
		}
249 1
	}
250
251
	/**
252
	 * {@inheritdoc}
253
	 */
254
	public function setLoop(EventLoop\LoopInterface $loop) : void
255
	{
256
		if (!$this->isConnected && !$this->isConnecting) {
257
			$this->loop = $loop;
258
259
		} else {
260
			throw new Exceptions\LogicException('Connection is already established. React event loop could not be changed.');
261
		}
262
	}
263
264
	/**
265
	 * {@inheritdoc}
266
	 */
267
	public function getLoop() : EventLoop\LoopInterface
268
	{
269
		return $this->loop;
270
	}
271
272
	/**
273
	 * {@inheritdoc}
274
	 *
275
	 * @throws Exceptions\InvalidStateException
276
	 */
277
	public function setConfiguration(Configuration\Broker $configuration) : void
278
	{
279
		if ($this->isConnected() || $this->isConnecting) {
280
			throw new Exceptions\InvalidStateException('Client is connecting or connected to the broker, therefore configuration could not be changed.');
281
		}
282
283
		$this->configuration = $configuration;
284
	}
285
286
	/**
287
	 * {@inheritdoc}
288
	 *
289
	 * @throws Exceptions\InvalidStateException
290
	 */
291
	public function getUri() : string
292
	{
293
		return $this->configuration->getUri();
294
	}
295
296
	/**
297
	 * {@inheritdoc}
298
	 */
299
	public function getPort() : int
300
	{
301
		return $this->configuration->getPort();
302
	}
303
304
	/**
305
	 * {@inheritdoc}
306
	 */
307
	public function isConnected() : bool
308
	{
309
		return $this->isConnected;
310
	}
311
312
	/**
313
	 * {@inheritdoc}
314
	 *
315
	 * @throws Exceptions\InvalidStateException
316
	 */
317
	public function connect() : Promise\ExtendedPromiseInterface
318
	{
319
		if ($this->isConnected || $this->isConnecting) {
320
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is already connected.'));
321
		}
322
323
		$this->createConnector();
324
325
		$this->onStart();
326
327
		$this->isConnecting = TRUE;
328
		$this->isConnected = FALSE;
329
330
		$connection = $this->configuration->getConnection();
331
332
		if ($connection->getClientID() === '') {
333
			$connection->setClientID($this->identifierGenerator->generateClientID());
334
		}
335
336
		$deferred = new Promise\Deferred;
337
338
		$this->establishConnection()
339
			->then(function (Socket\ConnectionInterface $stream) use ($connection, $deferred) {
340
				$this->stream = $stream;
341
342
				$this->onOpen($connection, $this);
343
344
				$this->registerClient($connection)
345
					->then(function (Mqtt\Connection $connection) use ($deferred) {
346
						$this->isConnecting = FALSE;
347
						$this->isConnected = TRUE;
348
349
						$this->connection = $connection;
350
351
						$this->onConnect($connection, $this);
352
353
						$deferred->resolve($this->connection);
354
					})
355
					->otherwise(function (Exception $ex) use ($stream, $deferred, $connection) {
356
						$this->isConnecting = FALSE;
357
358
						$this->onError($ex, $this);
359
360
						$deferred->reject($ex);
361
362
						if ($this->stream !== NULL) {
363
							$this->stream->close();
364
						}
365
366
						$this->onClose($connection, $this);
367
					});
368
			})
369
			->otherwise(function (Exception $ex) use ($deferred) {
370
				$this->isConnecting = FALSE;
371
372
				$this->onError($ex, $this);
373
374
				$deferred->reject($ex);
375
			});
376
377
		return $deferred->promise();
378
	}
379
380
	/**
381
	 * {@inheritdoc}
382
	 */
383
	public function disconnect() : Promise\ExtendedPromiseInterface
384
	{
385
		if (!$this->isConnected || $this->isDisconnecting) {
386
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
387
		}
388
389
		$this->isDisconnecting = TRUE;
390
391
		$deferred = new Promise\Deferred;
392
393
		$this->startFlow(new Mqtt\Flow\OutgoingDisconnectFlow($this->connection), TRUE)
394
			->then(function (Mqtt\Connection $connection) use ($deferred) {
395
				$this->isDisconnecting = FALSE;
396
				$this->isConnected = FALSE;
397
398
				$this->onDisconnect($connection, $this);
399
400
				$deferred->resolve($connection);
401
402
				if ($this->stream !== NULL) {
403
					$this->stream->close();
404
				}
405
			})
406
			->otherwise(function () use ($deferred) {
407
				$this->isDisconnecting = FALSE;
408
				$deferred->reject($this->connection);
409
			});
410
411
		return $deferred->promise();
412
	}
413
414
	/**
415
	 * {@inheritdoc}
416
	 */
417 View Code Duplication
	public function subscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
418
	{
419
		if (!$this->isConnected) {
420
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
421
		}
422
423
		return $this->startFlow(new Mqtt\Flow\OutgoingSubscribeFlow([$subscription], $this->identifierGenerator));
424
	}
425
426
	/**
427
	 * {@inheritdoc}
428
	 */
429 View Code Duplication
	public function unsubscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
430
	{
431
		if (!$this->isConnected) {
432
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
433
		}
434
435
		return $this->startFlow(new Mqtt\Flow\OutgoingUnsubscribeFlow([$subscription], $this->identifierGenerator));
436
	}
437
438
	/**
439
	 * {@inheritdoc}
440
	 */
441
	public function publish(Mqtt\Message $message) : Promise\ExtendedPromiseInterface
442
	{
443
		if (!$this->isConnected) {
444
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
445
		}
446
447
		return $this->startFlow(new Mqtt\Flow\OutgoingPublishFlow($message, $this->identifierGenerator));
448
	}
449
450
	/**
451
	 * {@inheritdoc}
452
	 */
453
	public function publishPeriodically(
454
		int $interval,
455
		Mqtt\Message $message,
456
		callable $generator
457
	) : Promise\ExtendedPromiseInterface {
458
		if (!$this->isConnected) {
459
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
460
		}
461
462
		$deferred = new Promise\Deferred;
463
464
		$this->timer[] = $this->loop->addPeriodicTimer($interval, function () use ($message, $generator, $deferred) {
465
			$this->publish($message->withPayload($generator($message->getTopic())))
466
				->then(
467
					function ($value) use ($deferred) {
468
						$deferred->notify($value);
469
					},
470
					function (Exception $e) use ($deferred) {
471
						$deferred->reject($e);
472
					}
473
				);
474
		});
475
476
		return $deferred->promise();
477
	}
478
479
	/**
480
	 * Establishes a network connection to a server
481
	 *
482
	 * @return Promise\ExtendedPromiseInterface
483
	 *
484
	 * @throws Exceptions\InvalidStateException
485
	 */
486
	private function establishConnection() : Promise\ExtendedPromiseInterface
487
	{
488
		$deferred = new Promise\Deferred;
489
490
		$timer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
491
			$exception = new Exceptions\RuntimeException(sprintf('Connection timed out after %d seconds.', $this->timeout));
492
493
			$deferred->reject($exception);
494
		});
495
496
		$this->connector->connect($this->configuration->getUri())
497
			->always(function () use ($timer) {
498
				$this->loop->cancelTimer($timer);
499
			})
500
			->then(function (Stream\DuplexStreamInterface $stream) use ($deferred) {
501
				$stream->on('data', function ($data) {
502
					$this->handleReceive($data);
503
				});
504
505
				$stream->on('close', function () {
506
					$this->handleClose();
507
				});
508
509
				$stream->on('error', function (Exception $ex) {
510
					$this->handleError($ex);
511
				});
512
513
				$deferred->resolve($stream);
514
			})
515
			->otherwise(function (Exception $ex) use ($deferred) {
516
				$deferred->reject($ex);
517
			});
518
519
		return $deferred->promise();
520
	}
521
522
	/**
523
	 * Registers a new client with the broker
524
	 *
525
	 * @param Mqtt\Connection $connection
526
	 *
527
	 * @return Promise\ExtendedPromiseInterface
528
	 */
529
	private function registerClient(Mqtt\Connection $connection) : Promise\ExtendedPromiseInterface
530
	{
531
		$deferred = new Promise\Deferred;
532
533
		$responseTimer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
534
			$exception = new Exceptions\RuntimeException(sprintf('No response after %d seconds.', $this->timeout));
535
536
			$deferred->reject($exception);
537
		});
538
539
		$this->startFlow(new Mqtt\Flow\OutgoingConnectFlow($connection, $this->identifierGenerator), TRUE)
540
			->always(function () use ($responseTimer) {
541
				$this->loop->cancelTimer($responseTimer);
542
543
			})->then(function (Mqtt\Connection $connection) use ($deferred) {
544
				$this->timer[] = $this->loop->addPeriodicTimer(floor($connection->getKeepAlive() * 0.75), function () {
545
					$this->startFlow(new Mqtt\Flow\OutgoingPingFlow);
546
				});
547
548
				$deferred->resolve($connection);
549
550
			})->otherwise(function (Exception $ex) use ($deferred) {
551
				$deferred->reject($ex);
552
			});
553
554
		return $deferred->promise();
555
	}
556
557
	/**
558
	 * Handles incoming data
559
	 *
560
	 * @param string $data
561
	 *
562
	 * @return void
563
	 */
564
	private function handleReceive(string $data) : void
565
	{
566
		if (!$this->isConnected && !$this->isConnecting) {
567
			return;
568
		}
569
570
		$flowCount = count($this->receivingFlows);
571
572
		$packets = $this->parser->push($data);
573
574
		foreach ($packets as $packet) {
575
			$this->handlePacket($packet);
576
		}
577
578
		if ($flowCount > count($this->receivingFlows)) {
579
			$this->receivingFlows = array_values($this->receivingFlows);
580
		}
581
	}
582
583
	/**
584
	 * Handles an incoming packet
585
	 *
586
	 * @param Mqtt\Packet $packet
587
	 *
588
	 * @return void
589
	 */
590
	private function handlePacket(Mqtt\Packet $packet) : void
591
	{
592
		switch ($packet->getPacketType()) {
593
			case Mqtt\Packet::TYPE_PUBLISH:
594
				/* @var Mqtt\Packet\PublishRequestPacket $packet */
595
				$message = new Mqtt\DefaultMessage(
596
					$packet->getTopic(),
597
					$packet->getPayload(),
598
					$packet->getQosLevel(),
599
					$packet->isRetained(),
600
					$packet->isDuplicate()
601
				);
602
603
				$this->startFlow(new Mqtt\Flow\IncomingPublishFlow($message, $packet->getIdentifier()));
604
				break;
605
606
			case Mqtt\Packet::TYPE_CONNACK:
607
			case Mqtt\Packet::TYPE_PINGRESP:
608
			case Mqtt\Packet::TYPE_SUBACK:
609
			case Mqtt\Packet::TYPE_UNSUBACK:
610
			case Mqtt\Packet::TYPE_PUBREL:
611
			case Mqtt\Packet::TYPE_PUBACK:
612
			case Mqtt\Packet::TYPE_PUBREC:
613
			case Mqtt\Packet::TYPE_PUBCOMP:
614
				$flowFound = FALSE;
615
616
				foreach ($this->receivingFlows as $index => $flow) {
617
					if ($flow->accept($packet)) {
618
						$flowFound = TRUE;
619
620
						unset($this->receivingFlows[$index]);
621
						$this->continueFlow($flow, $packet);
622
623
						break;
624
					}
625
				}
626
627
				if (!$flowFound) {
628
					$ex = new Exceptions\LogicException(sprintf('Received unexpected packet of type %d.', $packet->getPacketType()));
629
630
					$this->onWarning($ex, $this);
631
				}
632
				break;
633
634
			default:
635
				$ex = new Exceptions\LogicException(sprintf('Cannot handle packet of type %d.', $packet->getPacketType()));
636
637
				$this->onWarning($ex, $this);
638
		}
639
	}
640
641
	/**
642
	 * Handles outgoing packets
643
	 *
644
	 * @return void
645
	 */
646
	private function handleSend() : void
647
	{
648
		$flow = NULL;
649
650
		if ($this->writtenFlow !== NULL) {
651
			$flow = $this->writtenFlow;
652
			$this->writtenFlow = NULL;
653
		}
654
655
		if (count($this->sendingFlows) > 0) {
656
			$this->writtenFlow = array_shift($this->sendingFlows);
657
			$this->stream->write($this->writtenFlow->getPacket());
658
		}
659
660
		if ($flow !== NULL) {
661
			if ($flow->isFinished()) {
662
				$this->loop->futureTick(function () use ($flow) {
663
					$this->finishFlow($flow);
664
				});
665
666
			} else {
667
				$this->receivingFlows[] = $flow;
668
			}
669
		}
670
	}
671
672
	/**
673
	 * Handles closing of the stream
674
	 *
675
	 * @return void
676
	 */
677
	private function handleClose() : void
678
	{
679
		foreach ($this->timer as $timer) {
680
			$this->loop->cancelTimer($timer);
681
		}
682
683
		$connection = $this->connection;
684
685
		$this->isConnecting = FALSE;
686
		$this->isDisconnecting = FALSE;
687
		$this->isConnected = FALSE;
688
		$this->connection = NULL;
689
		$this->stream = NULL;
690
691
		if ($connection !== NULL) {
692
			$this->onClose($connection, $this);
693
		}
694
	}
695
696
	/**
697
	 * Handles errors of the stream
698
	 *
699
	 * @param Exception $ex
700
	 *
701
	 * @return void
702
	 */
703
	private function handleError(Exception $ex) : void
704
	{
705
		$this->onError($ex, $this);
706
	}
707
708
	/**
709
	 * Starts the given flow
710
	 *
711
	 * @param Mqtt\Flow $flow
712
	 * @param bool $isSilent
713
	 *
714
	 * @return Promise\ExtendedPromiseInterface
715
	 */
716
	private function startFlow(Mqtt\Flow $flow, bool $isSilent = FALSE) : Promise\ExtendedPromiseInterface
717
	{
718
		try {
719
			$packet = $flow->start();
720
721
		} catch (Exception $ex) {
722
			$this->onError($ex, $this);
723
724
			return new Promise\RejectedPromise($ex);
725
		}
726
727
		$deferred = new Promise\Deferred;
728
		$internalFlow = new Flow\Envelope($flow, $deferred, $packet, $isSilent);
729
730 View Code Duplication
		if ($packet !== NULL) {
731
			if ($this->writtenFlow !== NULL) {
732
				$this->sendingFlows[] = $internalFlow;
733
734
			} else {
735
				$this->stream->write($packet);
736
				$this->writtenFlow = $internalFlow;
737
738
				$this->handleSend();
739
			}
740
741
		} else {
742
			$this->loop->futureTick(function () use ($internalFlow) {
743
				$this->finishFlow($internalFlow);
744
			});
745
		}
746
747
		return $deferred->promise();
748
	}
749
750
	/**
751
	 * Continues the given flow
752
	 *
753
	 * @param Flow\Envelope $flow
754
	 * @param Mqtt\Packet $packet
755
	 *
756
	 * @return void
757
	 */
758
	private function continueFlow(Flow\Envelope $flow, Mqtt\Packet $packet) : void
759
	{
760
		try {
761
			$response = $flow->next($packet);
762
763
		} catch (Exception $ex) {
764
			$this->onError($ex, $this);
765
766
			return;
767
		}
768
769 View Code Duplication
		if ($response !== NULL) {
770
			if ($this->writtenFlow !== NULL) {
771
				$this->sendingFlows[] = $flow;
772
773
			} else {
774
				$this->stream->write($response);
775
				$this->writtenFlow = $flow;
776
777
				$this->handleSend();
778
			}
779
780
		} elseif ($flow->isFinished()) {
781
			$this->loop->futureTick(function () use ($flow) {
782
				$this->finishFlow($flow);
783
			});
784
		}
785
	}
786
787
	/**
788
	 * Finishes the given flow
789
	 *
790
	 * @param Flow\Envelope $flow
791
	 *
792
	 * @return void
793
	 */
794
	private function finishFlow(Flow\Envelope $flow) : void
795
	{
796
		if ($flow->isSuccess()) {
797
			if (!$flow->isSilent()) {
798
				switch ($flow->getCode()) {
799
					case 'ping':
800
						$this->onPing($this);
801
						break;
802
803
					case 'pong':
804
						$this->onPong($this);
805
						break;
806
807
					case 'connect':
808
						$this->onConnect($flow->getResult(), $this);
809
						break;
810
811
					case 'disconnect':
812
						$this->onDisconnect($flow->getResult(), $this);
813
						break;
814
815
					case 'publish':
816
						$this->onPublish($flow->getResult(), $this);
817
						break;
818
819
					case 'subscribe':
820
						$this->onSubscribe($flow->getResult(), $this);
821
						break;
822
823
					case 'unsubscribe':
824
						$this->onUnsubscribe($flow->getResult(), $this);
825
						break;
826
827
					case 'message':
828
						/** @var Mqtt\Message $message */
829
						$message = $flow->getResult();
830
831
						if ($this->lastMessage === NULL) {
832
							$this->lastMessage = $flow->getResult();
833
834
						} elseif (
835
							$this->lastMessage->getTopic() === $message->getTopic()
836
							&& $this->lastMessage->getPayload() === $message->getPayload()
837
						) {
838
							break;
839
						}
840
841
						$this->lastMessage = $message;
842
843
						$this->onMessage($message, $this);
844
						break;
845
				}
846
			}
847
848
			$flow->getDeferred()->resolve($flow->getResult());
849
850
		} else {
851
			$ex = new Exceptions\RuntimeException($flow->getErrorMessage());
852
			$this->onWarning($ex, $this);
853
854
			$flow->getDeferred()->reject($ex);
855
		}
856
	}
857
858
	/**
859
	 * @return void
860
	 */
861
	private function createConnector() : void
862
	{
863
		$this->connector = new Socket\Connector($this->loop);
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\Connector($this->loop) of type object<React\Socket\Connector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
864
865
		if ($this->configuration->isSSLEnabled()) {
866
			$this->connector = new Socket\TcpConnector($this->loop);
867
			$this->connector = new Socket\SecureConnector($this->connector, $this->loop, $this->configuration->getSSLConfiguration());
868
		}
869
	}
870
}
871