Completed
Push — master ( f55360...030313 )
by Adam
01:51
created

Client::disconnect()   A

Complexity

Conditions 4
Paths 2

Size

Total Lines 30

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
dl 0
loc 30
ccs 0
cts 18
cp 0
rs 9.44
c 0
b 0
f 0
cc 4
nc 2
nop 0
crap 20
1
<?php
2
/**
3
 * Client.php
4
 *
5
 * @copyright      More in license.md
6
 * @license        http://www.ipublikuj.eu
7
 * @author         Adam Kadlec http://www.ipublikuj.eu
8
 * @package        iPublikuj:MQTTClient!
9
 * @subpackage     Client
10
 * @since          1.0.0
11
 *
12
 * @date           12.03.17
13
 */
14
15
declare(strict_types = 1);
16
17
namespace IPub\MQTTClient\Client;
18
19
use Nette;
20
21
use React\EventLoop;
22
use React\Dns;
23
use React\Promise;
24
use React\Socket;
25
use React\Stream;
26
27
use BinSoul\Net\Mqtt;
28
29
use IPub\MQTTClient\Exceptions;
30
use IPub\MQTTClient\Flow;
31
32
/**
33
 * Connection client
34
 *
35
 * @package        iPublikuj:MQTTClient!
36
 * @subpackage     Client
37
 *
38
 * @author         Adam Kadlec <[email protected]>
39
 *
40
 * @method onOpen(Mqtt\Connection $connection, IClient $client)
41
 * @method onConnect(Mqtt\Connection $connection, IClient $client)
42
 * @method onDisconnect(Mqtt\Connection $connection, IClient $client)
43
 * @method onClose(Mqtt\Connection $connection, IClient $client)
44
 * @method onPing(IClient $client)
45
 * @method onPong(IClient $client)
46
 * @method onPublish(Mqtt\Message $message, IClient $client)
47
 * @method onSubscribe(Mqtt\Subscription $subscription, IClient $client)
48
 * @method onUnsubscribe(Mqtt\Subscription $subscription, IClient $client)
49
 * @method onMessage(Mqtt\Message $message, IClient $client)
50
 * @method onWarning(\Exception $ex, IClient $client)
51
 * @method onError(\Exception $ex, IClient $client)
52
 */
53 1
final class Client implements IClient
54
{
55
	/**
56
	 * Implement nette smart magic
57
	 */
58 1
	use Nette\SmartObject;
59
60
	/**
61
	 * @var \Closure
62
	 */
63
	public $onOpen = [];
64
65
	/**
66
	 * @var \Closure
67
	 */
68
	public $onConnect = [];
69
70
	/**
71
	 * @var \Closure
72
	 */
73
	public $onDisconnect = [];
74
75
	/**
76
	 * @var \Closure
77
	 */
78
	public $onClose = [];
79
80
	/**
81
	 * @var \Closure
82
	 */
83
	public $onPing = [];
84
85
	/**
86
	 * @var \Closure
87
	 */
88
	public $onPong = [];
89
90
	/**
91
	 * @var \Closure
92
	 */
93
	public $onPublish = [];
94
95
	/**
96
	 * @var \Closure
97
	 */
98
	public $onSubscribe = [];
99
100
	/**
101
	 * @var \Closure
102
	 */
103
	public $onUnsubscribe = [];
104
105
	/**
106
	 * @var \Closure
107
	 */
108
	public $onMessage = [];
109
110
	/**
111
	 * @var \Closure
112
	 */
113
	public $onWarning = [];
114
115
	/**
116
	 * @var \Closure
117
	 */
118
	public $onError = [];
119
120
	/**
121
	 * @var EventLoop\LoopInterface
122
	 */
123
	private $loop;
124
125
	/**
126
	 * @var Configuration
127
	 */
128
	private $configuration;
129
130
	/**
131
	 * @var Socket\ConnectorInterface
132
	 */
133
	private $connector;
134
135
	/**
136
	 * @var Socket\ConnectionInterface|NULL
137
	 */
138
	private $stream = NULL;
139
140
	/**
141
	 * @var Mqtt\Connection
142
	 */
143
	private $connection;
144
145
	/**
146
	 * @var Mqtt\StreamParser
147
	 */
148
	private $parser;
149
150
	/**
151
	 * @var Mqtt\IdentifierGenerator
152
	 */
153
	private $identifierGenerator;
154
155
	/**
156
	 * @var bool
157
	 */
158
	private $isConnected = FALSE;
159
160
	/**
161
	 * @var bool
162
	 */
163
	private $isConnecting = FALSE;
164
165
	/**
166
	 * @var bool
167
	 */
168
	private $isDisconnecting = FALSE;
169
170
	/**
171
	 * @var string
172
	 */
173
	private $host;
174
175
	/**
176
	 * @var int
177
	 */
178
	private $port;
179
180
	/**
181
	 * @var int
182
	 */
183
	private $timeout = 5;
184
185
	/**
186
	 * @var Flow\Envelope[]
187
	 */
188
	private $receivingFlows = [];
189
190
	/**
191
	 * @var Flow\Envelope[]
192
	 */
193
	private $sendingFlows = [];
194
195
	/**
196
	 * @var Flow\Envelope
197
	 */
198
	private $writtenFlow;
199
200
	/**
201
	 * @var EventLoop\Timer\TimerInterface[]
202
	 */
203
	private $timer = [];
204
205
	/**
206
	 * @param EventLoop\LoopInterface $eventLoop
207
	 * @param Configuration $configuration
208
	 * @param Mqtt\IdentifierGenerator|NULL $identifierGenerator
209
	 * @param Mqtt\StreamParser|NULL $parser
210
	 */
211
	public function __construct(
212
		EventLoop\LoopInterface $eventLoop,
213
		Configuration $configuration,
214
		Mqtt\IdentifierGenerator $identifierGenerator = NULL,
215
		Mqtt\StreamParser $parser = NULL
216
	) {
217 1
		$this->loop = $eventLoop;
218 1
		$this->configuration = $configuration;
219
220 1
		$this->createConnector();
221
222 1
		$this->parser = $parser;
223
224 1
		if ($this->parser === NULL) {
225 1
			$this->parser = new Mqtt\StreamParser;
226
		}
227
228 1
		$this->parser->onError(function (\Exception $ex) {
229
			$this->onError($ex, $this);
230 1
		});
231
232 1
		$this->identifierGenerator = $identifierGenerator;
233
234 1
		if ($this->identifierGenerator === NULL) {
235 1
			$this->identifierGenerator = new Mqtt\DefaultIdentifierGenerator;
0 ignored issues
show
Documentation Bug introduced by
It seems like new \BinSoul\Net\Mqtt\DefaultIdentifierGenerator() of type object<BinSoul\Net\Mqtt\...ultIdentifierGenerator> is incompatible with the declared type object<BinSoul\Net\Mqtt\IdentifierGenerator> of property $identifierGenerator.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
236
		}
237 1
	}
238
239
	/**
240
	 * {@inheritdoc}
241
	 */
242
	public function setLoop(EventLoop\LoopInterface $loop) : void
243
	{
244
		if (!$this->isConnected && !$this->isConnecting) {
245
			$this->loop = $loop;
246
247
			$this->createConnector();
248
249
		} else {
250
			throw new Exceptions\LogicException('Connection is already established. React event loop could not be changed.');
251
		}
252
	}
253
254
	/**
255
	 * {@inheritdoc}
256
	 */
257
	public function getLoop() : EventLoop\LoopInterface
258
	{
259
		return $this->loop;
260
	}
261
262
	/**
263
	 * {@inheritdoc}
264
	 */
265
	public function setConfiguration(Configuration $configuration) : void
266
	{
267
		if ($this->isConnected() || $this->isConnecting) {
268
			throw new Exceptions\InvalidStateException('Client is connecting or connected to the broker, therefore configuration could not be changed.');
269
		}
270
271
		$this->configuration = $configuration;
272
	}
273
274
	/**
275
	 * {@inheritdoc}
276
	 */
277
	public function getUri() : string
278
	{
279
		return $this->configuration->getUri();
280
	}
281
282
	/**
283
	 * {@inheritdoc}
284
	 */
285
	public function getPort() : int
286
	{
287
		return $this->configuration->getPort();
288
	}
289
290
	/**
291
	 * {@inheritdoc}
292
	 */
293
	public function isConnected() : bool
294
	{
295
		return $this->isConnected;
296
	}
297
298
	/**
299
	 * {@inheritdoc}
300
	 */
301
	public function connect() : Promise\ExtendedPromiseInterface
302
	{
303
		if ($this->isConnected || $this->isConnecting) {
304
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is already connected.'));
305
		}
306
307
		$this->isConnecting = TRUE;
308
		$this->isConnected = FALSE;
309
310
		$connection = $this->configuration->getConnection();
311
312
		if ($connection->getClientID() === '') {
313
			$connection = $connection->withClientID($this->identifierGenerator->generateClientID());
314
		}
315
316
		$deferred = new Promise\Deferred;
317
318
		$this->establishConnection()
319
			->then(function (Socket\ConnectionInterface $stream) use ($connection, $deferred) {
320
				$this->stream = $stream;
321
322
				$this->onOpen($connection, $this);
323
324
				$this->registerClient($connection)
325
					->then(function (Mqtt\Connection $connection) use ($deferred) {
326
						$this->isConnecting = FALSE;
327
						$this->isConnected = TRUE;
328
329
						$this->connection = $connection;
330
331
						$this->onConnect($connection, $this);
332
333
						$deferred->resolve($this->connection);
334
					})
335
					->otherwise(function (\Exception $ex) use ($stream, $deferred, $connection) {
336
						$this->isConnecting = FALSE;
337
338
						$this->onError($ex, $this);
339
340
						$deferred->reject($ex);
341
342
						if ($this->stream !== NULL) {
343
							$this->stream->close();
344
						}
345
346
						$this->onClose($connection, $this);
347
					});
348
			})
349
			->otherwise(function (\Exception $ex) use ($deferred) {
350
				$this->isConnecting = FALSE;
351
352
				$this->onError($ex, $this);
353
354
				$deferred->reject($ex);
355
			});
356
357
		return $deferred->promise();
358
	}
359
360
	/**
361
	 * {@inheritdoc}
362
	 */
363
	public function disconnect() : Promise\ExtendedPromiseInterface
364
	{
365
		if (!$this->isConnected || $this->isDisconnecting) {
366
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
367
		}
368
369
		$this->isDisconnecting = TRUE;
370
371
		$deferred = new Promise\Deferred;
372
373
		$this->startFlow(new Mqtt\Flow\OutgoingDisconnectFlow($this->connection), TRUE)
374
			->then(function (Mqtt\Connection $connection) use ($deferred) {
375
				$this->isDisconnecting = FALSE;
376
				$this->isConnected = FALSE;
377
378
				$this->onDisconnect($connection, $this);
379
380
				$deferred->resolve($connection);
381
382
				if ($this->stream !== NULL) {
383
					$this->stream->close();
384
				}
385
			})
386
			->otherwise(function () use ($deferred) {
387
				$this->isDisconnecting = FALSE;
388
				$deferred->reject($this->connection);
389
			});
390
391
		return $deferred->promise();
392
	}
393
394
	/**
395
	 * {@inheritdoc}
396
	 */
397 View Code Duplication
	public function subscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
398
	{
399
		if (!$this->isConnected) {
400
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
401
		}
402
403
		return $this->startFlow(new Mqtt\Flow\OutgoingSubscribeFlow([$subscription], $this->identifierGenerator));
404
	}
405
406
	/**
407
	 * {@inheritdoc}
408
	 */
409 View Code Duplication
	public function unsubscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
410
	{
411
		if (!$this->isConnected) {
412
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
413
		}
414
415
		return $this->startFlow(new Mqtt\Flow\OutgoingUnsubscribeFlow([$subscription], $this->identifierGenerator));
416
	}
417
418
	/**
419
	 * {@inheritdoc}
420
	 */
421
	public function publish(Mqtt\Message $message) : Promise\ExtendedPromiseInterface
422
	{
423
		if (!$this->isConnected) {
424
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
425
		}
426
427
		return $this->startFlow(new Mqtt\Flow\OutgoingPublishFlow($message, $this->identifierGenerator));
428
	}
429
430
	/**
431
	 * {@inheritdoc}
432
	 */
433
	public function publishPeriodically(
434
		int $interval,
435
		Mqtt\Message $message,
436
		callable $generator
437
	) : Promise\ExtendedPromiseInterface {
438
		if (!$this->isConnected) {
439
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
440
		}
441
442
		$deferred = new Promise\Deferred;
443
444
		$this->timer[] = $this->loop->addPeriodicTimer($interval, function () use ($message, $generator, $deferred) {
445
			$this->publish($message->withPayload($generator($message->getTopic())))
446
				->then(
447
					function ($value) use ($deferred) {
448
						$deferred->notify($value);
449
					},
450
					function (\Exception $e) use ($deferred) {
451
						$deferred->reject($e);
452
					}
453
				);
454
		});
455
456
		return $deferred->promise();
457
	}
458
459
	/**
460
	 * Establishes a network connection to a server
461
	 *
462
	 * @return Promise\ExtendedPromiseInterface
463
	 */
464
	private function establishConnection() : Promise\ExtendedPromiseInterface
465
	{
466
		$deferred = new Promise\Deferred;
467
468
		$timer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
469
			$exception = new Exceptions\RuntimeException(sprintf('Connection timed out after %d seconds.', $this->timeout));
470
471
			$deferred->reject($exception);
472
		});
473
474
		$this->connector->connect($this->configuration->getUri())
475
			->always(function () use ($timer) {
476
				$this->loop->cancelTimer($timer);
477
			})
478
			->then(function (Stream\DuplexStreamInterface $stream) use ($deferred) {
479
				$stream->on('data', function ($data) {
480
					$this->handleReceive($data);
481
				});
482
483
				$stream->on('close', function () {
484
					$this->handleClose();
485
				});
486
487
				$stream->on('error', function (\Exception $ex) {
488
					$this->handleError($ex);
489
				});
490
491
				$deferred->resolve($stream);
492
			})
493
			->otherwise(function (\Exception $ex) use ($deferred) {
494
				$deferred->reject($ex);
495
			});
496
497
		return $deferred->promise();
498
	}
499
500
	/**
501
	 * Registers a new client with the broker
502
	 *
503
	 * @param Mqtt\Connection $connection
504
	 *
505
	 * @return Promise\ExtendedPromiseInterface
506
	 */
507
	private function registerClient(Mqtt\Connection $connection) : Promise\ExtendedPromiseInterface
508
	{
509
		$deferred = new Promise\Deferred;
510
511
		$responseTimer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
512
			$exception = new Exceptions\RuntimeException(sprintf('No response after %d seconds.', $this->timeout));
513
514
			$deferred->reject($exception);
515
		});
516
517
		$this->startFlow(new Mqtt\Flow\OutgoingConnectFlow($connection, $this->identifierGenerator), TRUE)
518
			->always(function () use ($responseTimer) {
519
				$this->loop->cancelTimer($responseTimer);
520
521
			})->then(function (Mqtt\Connection $connection) use ($deferred) {
522
				$this->timer[] = $this->loop->addPeriodicTimer(floor($connection->getKeepAlive() * 0.75), function () {
523
					$this->startFlow(new Mqtt\Flow\OutgoingPingFlow);
524
				});
525
526
				$deferred->resolve($connection);
527
528
			})->otherwise(function (\Exception $ex) use ($deferred) {
529
				$deferred->reject($ex);
530
			});
531
532
		return $deferred->promise();
533
	}
534
535
	/**
536
	 * Handles incoming data
537
	 *
538
	 * @param string $data
539
	 *
540
	 * @return void
541
	 */
542
	private function handleReceive(string $data) : void
543
	{
544
		if (!$this->isConnected && !$this->isConnecting) {
545
			return;
546
		}
547
548
		$flowCount = count($this->receivingFlows);
549
550
		$packets = $this->parser->push($data);
551
552
		foreach ($packets as $packet) {
553
			$this->handlePacket($packet);
554
		}
555
556
		if ($flowCount > count($this->receivingFlows)) {
557
			$this->receivingFlows = array_values($this->receivingFlows);
558
		}
559
	}
560
561
	/**
562
	 * Handles an incoming packet
563
	 *
564
	 * @param Mqtt\Packet $packet
565
	 *
566
	 * @return void
567
	 */
568
	private function handlePacket(Mqtt\Packet $packet) : void
569
	{
570
		switch ($packet->getPacketType()) {
571
			case Mqtt\Packet::TYPE_PUBLISH:
572
				/* @var Mqtt\Packet\PublishRequestPacket $packet */
573
				$message = new Mqtt\DefaultMessage(
574
					$packet->getTopic(),
575
					$packet->getPayload(),
576
					$packet->getQosLevel(),
577
					$packet->isRetained(),
578
					$packet->isDuplicate()
579
				);
580
581
				$this->startFlow(new Mqtt\Flow\IncomingPublishFlow($message, $packet->getIdentifier()));
582
				break;
583
584
			case Mqtt\Packet::TYPE_CONNACK:
585
			case Mqtt\Packet::TYPE_PINGRESP:
586
			case Mqtt\Packet::TYPE_SUBACK:
587
			case Mqtt\Packet::TYPE_UNSUBACK:
588
			case Mqtt\Packet::TYPE_PUBREL:
589
			case Mqtt\Packet::TYPE_PUBACK:
590
			case Mqtt\Packet::TYPE_PUBREC:
591
			case Mqtt\Packet::TYPE_PUBCOMP:
592
				$flowFound = FALSE;
593
594
				foreach ($this->receivingFlows as $index => $flow) {
595
					if ($flow->accept($packet)) {
596
						$flowFound = TRUE;
597
598
						unset($this->receivingFlows[$index]);
599
						$this->continueFlow($flow, $packet);
600
601
						break;
602
					}
603
				}
604
605
				if (!$flowFound) {
606
					$ex = new Exceptions\LogicException(sprintf('Received unexpected packet of type %d.', $packet->getPacketType()));
607
608
					$this->onWarning($ex, $this);
609
				}
610
				break;
611
612
			default:
613
				$ex = new Exceptions\LogicException(sprintf('Cannot handle packet of type %d.', $packet->getPacketType()));
614
615
				$this->onWarning($ex, $this);
616
		}
617
	}
618
619
	/**
620
	 * Handles outgoing packets
621
	 *
622
	 * @return void
623
	 */
624
	private function handleSend() : void
625
	{
626
		$flow = NULL;
627
628
		if ($this->writtenFlow !== NULL) {
629
			$flow = $this->writtenFlow;
630
			$this->writtenFlow = NULL;
631
		}
632
633
		if (count($this->sendingFlows) > 0) {
634
			$this->writtenFlow = array_shift($this->sendingFlows);
635
			$this->stream->write($this->writtenFlow->getPacket());
636
		}
637
638
		if ($flow !== NULL) {
639
			if ($flow->isFinished()) {
640
				$this->loop->nextTick(function () use ($flow) {
641
					$this->finishFlow($flow);
642
				});
643
644
			} else {
645
				$this->receivingFlows[] = $flow;
646
			}
647
		}
648
	}
649
650
	/**
651
	 * Handles closing of the stream
652
	 *
653
	 * @return void
654
	 */
655
	private function handleClose() : void
656
	{
657
		foreach ($this->timer as $timer) {
658
			$this->loop->cancelTimer($timer);
659
		}
660
661
		$connection = $this->connection;
662
663
		$this->isConnecting = FALSE;
664
		$this->isDisconnecting = FALSE;
665
		$this->isConnected = FALSE;
666
		$this->connection = NULL;
667
		$this->stream = NULL;
668
669
		if ($connection !== NULL) {
670
			$this->onClose($connection, $this);
671
		}
672
	}
673
674
	/**
675
	 * Handles errors of the stream
676
	 *
677
	 * @param \Exception $ex
678
	 *
679
	 * @return void
680
	 */
681
	private function handleError(\Exception $ex) : void
682
	{
683
		$this->onError($ex, $this);
684
	}
685
686
	/**
687
	 * Starts the given flow
688
	 *
689
	 * @param Mqtt\Flow $flow
690
	 * @param bool $isSilent
691
	 *
692
	 * @return Promise\ExtendedPromiseInterface
693
	 */
694
	private function startFlow(Mqtt\Flow $flow, bool $isSilent = FALSE) : Promise\ExtendedPromiseInterface
695
	{
696
		try {
697
			$packet = $flow->start();
698
699
		} catch (\Exception $ex) {
700
			$this->onError($ex, $this);
701
702
			return new Promise\RejectedPromise($ex);
703
		}
704
705
		$deferred = new Promise\Deferred;
706
		$internalFlow = new Flow\Envelope($flow, $deferred, $packet, $isSilent);
707
708 View Code Duplication
		if ($packet !== NULL) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
709
			if ($this->writtenFlow !== NULL) {
710
				$this->sendingFlows[] = $internalFlow;
711
712
			} else {
713
				$this->stream->write($packet);
714
				$this->writtenFlow = $internalFlow;
715
716
				$this->handleSend();
717
			}
718
719
		} else {
720
			$this->loop->nextTick(function () use ($internalFlow) {
721
				$this->finishFlow($internalFlow);
722
			});
723
		}
724
725
		return $deferred->promise();
726
	}
727
728
	/**
729
	 * Continues the given flow
730
	 *
731
	 * @param Flow\Envelope $flow
732
	 * @param Mqtt\Packet $packet
733
	 *
734
	 * @return void
735
	 */
736
	private function continueFlow(Flow\Envelope $flow, Mqtt\Packet $packet) : void
737
	{
738
		try {
739
			$response = $flow->next($packet);
740
741
		} catch (\Exception $ex) {
742
			$this->onError($ex, $this);
743
744
			return;
745
		}
746
747 View Code Duplication
		if ($response !== NULL) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
748
			if ($this->writtenFlow !== NULL) {
749
				$this->sendingFlows[] = $flow;
750
751
			} else {
752
				$this->stream->write($response);
753
				$this->writtenFlow = $flow;
754
755
				$this->handleSend();
756
			}
757
758
		} elseif ($flow->isFinished()) {
759
			$this->loop->nextTick(function () use ($flow) {
760
				$this->finishFlow($flow);
761
			});
762
		}
763
	}
764
765
	/**
766
	 * Finishes the given flow
767
	 *
768
	 * @param Flow\Envelope $flow
769
	 *
770
	 * @return void
771
	 */
772
	private function finishFlow(Flow\Envelope $flow) : void
773
	{
774
		if ($flow->isSuccess()) {
775
			if (!$flow->isSilent()) {
776
				switch ($flow->getCode()) {
777
					case 'ping':
778
						$this->onPing($this);
779
						break;
780
781
					case 'pong':
782
						$this->onPong($this);
783
						break;
784
785
					case 'connect':
786
						$this->onConnect($flow->getResult(), $this);
787
						break;
788
789
					case 'disconnect':
790
						$this->onDisconnect($flow->getResult(), $this);
791
						break;
792
793
					case 'publish':
794
						$this->onPublish($flow->getResult(), $this);
795
						break;
796
797
					case 'subscribe':
798
						$this->onSubscribe($flow->getResult(), $this);
799
						break;
800
801
					case 'unsubscribe':
802
						$this->onUnsubscribe($flow->getResult(), $this);
803
						break;
804
805
					case 'message':
806
						$this->onMessage($flow->getResult(), $this);
807
						break;
808
				}
809
			}
810
811
			$flow->getDeferred()->resolve($flow->getResult());
812
813
		} else {
814
			$ex = new Exceptions\RuntimeException($flow->getErrorMessage());
815
			$this->onWarning($ex, $this);
816
817
			$flow->getDeferred()->reject($ex);
818
		}
819
	}
820
821
	/**
822
	 * @return void
823
	 */
824
	private function createConnector() : void
825
	{
826 1
		$this->connector = new Socket\TcpConnector($this->loop);
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\TcpConnector($this->loop) of type object<React\Socket\TcpConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
827
828 1
		if ($this->configuration->isDNSEnabled()) {
829 1
			$dnsResolverFactory = new Dns\Resolver\Factory;
830
831 1
			$this->connector = new Socket\DnsConnector($this->connector, $dnsResolverFactory->createCached($this->configuration->getDNSAddress(), $this->loop));
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\DnsCon...ddress(), $this->loop)) of type object<React\Socket\DnsConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
832
		}
833
834 1
		if ($this->configuration->isSSLEnabled()) {
835
			$this->connector = new Socket\SecureConnector($this->connector, $this->loop, $this->configuration->getSSLConfiguration());
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\Secure...>getSSLConfiguration()) of type object<React\Socket\SecureConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
836
		}
837 1
	}
838
}
839