Passed
Push — master ( 030313...88332e )
by Adam
01:56
created

Client::publish()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 1
crap 6
1
<?php
2
/**
3
 * Client.php
4
 *
5
 * @copyright      More in license.md
6
 * @license        http://www.ipublikuj.eu
7
 * @author         Adam Kadlec http://www.ipublikuj.eu
8
 * @package        iPublikuj:MQTTClient!
9
 * @subpackage     Client
10
 * @since          1.0.0
11
 *
12
 * @date           12.03.17
13
 */
14
15
declare(strict_types = 1);
16
17
namespace IPub\MQTTClient\Client;
18
19
use Nette;
20
21
use React\EventLoop;
22
use React\Dns;
23
use React\Promise;
24
use React\Socket;
25
use React\Stream;
26
27
use BinSoul\Net\Mqtt;
28
29
use IPub\MQTTClient\Exceptions;
30
use IPub\MQTTClient\Flow;
31
32
/**
33
 * Connection client
34
 *
35
 * @package        iPublikuj:MQTTClient!
36
 * @subpackage     Client
37
 *
38
 * @author         Adam Kadlec <[email protected]>
39
 *
40
 * @method onStart()
41
 * @method onOpen(Mqtt\Connection $connection, IClient $client)
42
 * @method onConnect(Mqtt\Connection $connection, IClient $client)
43
 * @method onDisconnect(Mqtt\Connection $connection, IClient $client)
44
 * @method onClose(Mqtt\Connection $connection, IClient $client)
45
 * @method onPing(IClient $client)
46
 * @method onPong(IClient $client)
47
 * @method onPublish(Mqtt\Message $message, IClient $client)
48
 * @method onSubscribe(Mqtt\Subscription $subscription, IClient $client)
49
 * @method onUnsubscribe(Mqtt\Subscription $subscription, IClient $client)
50
 * @method onMessage(Mqtt\Message $message, IClient $client)
51
 * @method onWarning(\Exception $ex, IClient $client)
52
 * @method onError(\Exception $ex, IClient $client)
53
 */
54 1
final class Client implements IClient
55
{
56
	/**
57
	 * Implement nette smart magic
58
	 */
59 1
	use Nette\SmartObject;
60
61
	/**
62
	 * @var \Closure
63
	 */
64
	public $onStart = [];
65
66
	/**
67
	 * @var \Closure
68
	 */
69
	public $onOpen = [];
70
71
	/**
72
	 * @var \Closure
73
	 */
74
	public $onConnect = [];
75
76
	/**
77
	 * @var \Closure
78
	 */
79
	public $onDisconnect = [];
80
81
	/**
82
	 * @var \Closure
83
	 */
84
	public $onClose = [];
85
86
	/**
87
	 * @var \Closure
88
	 */
89
	public $onPing = [];
90
91
	/**
92
	 * @var \Closure
93
	 */
94
	public $onPong = [];
95
96
	/**
97
	 * @var \Closure
98
	 */
99
	public $onPublish = [];
100
101
	/**
102
	 * @var \Closure
103
	 */
104
	public $onSubscribe = [];
105
106
	/**
107
	 * @var \Closure
108
	 */
109
	public $onUnsubscribe = [];
110
111
	/**
112
	 * @var \Closure
113
	 */
114
	public $onMessage = [];
115
116
	/**
117
	 * @var \Closure
118
	 */
119
	public $onWarning = [];
120
121
	/**
122
	 * @var \Closure
123
	 */
124
	public $onError = [];
125
126
	/**
127
	 * @var EventLoop\LoopInterface
128
	 */
129
	private $loop;
130
131
	/**
132
	 * @var Configuration
133
	 */
134
	private $configuration;
135
136
	/**
137
	 * @var Socket\ConnectorInterface
138
	 */
139
	private $connector;
140
141
	/**
142
	 * @var Socket\ConnectionInterface|NULL
143
	 */
144
	private $stream = NULL;
145
146
	/**
147
	 * @var Mqtt\Connection
148
	 */
149
	private $connection;
150
151
	/**
152
	 * @var Mqtt\StreamParser
153
	 */
154
	private $parser;
155
156
	/**
157
	 * @var Mqtt\IdentifierGenerator
158
	 */
159
	private $identifierGenerator;
160
161
	/**
162
	 * @var bool
163
	 */
164
	private $isConnected = FALSE;
165
166
	/**
167
	 * @var bool
168
	 */
169
	private $isConnecting = FALSE;
170
171
	/**
172
	 * @var bool
173
	 */
174
	private $isDisconnecting = FALSE;
175
176
	/**
177
	 * @var string
178
	 */
179
	private $host;
180
181
	/**
182
	 * @var int
183
	 */
184
	private $port;
185
186
	/**
187
	 * @var int
188
	 */
189
	private $timeout = 5;
190
191
	/**
192
	 * @var Flow\Envelope[]
193
	 */
194
	private $receivingFlows = [];
195
196
	/**
197
	 * @var Flow\Envelope[]
198
	 */
199
	private $sendingFlows = [];
200
201
	/**
202
	 * @var Flow\Envelope
203
	 */
204
	private $writtenFlow;
205
206
	/**
207
	 * @var EventLoop\Timer\TimerInterface[]
208
	 */
209
	private $timer = [];
210
211
	/**
212
	 * @param EventLoop\LoopInterface $eventLoop
213
	 * @param Configuration $configuration
214
	 * @param Mqtt\IdentifierGenerator|NULL $identifierGenerator
215
	 * @param Mqtt\StreamParser|NULL $parser
216
	 */
217
	public function __construct(
218
		EventLoop\LoopInterface $eventLoop,
219
		Configuration $configuration,
220
		Mqtt\IdentifierGenerator $identifierGenerator = NULL,
221
		Mqtt\StreamParser $parser = NULL
222
	) {
223 1
		$this->loop = $eventLoop;
224 1
		$this->configuration = $configuration;
225
226 1
		$this->createConnector();
227
228 1
		$this->parser = $parser;
229
230 1
		if ($this->parser === NULL) {
231 1
			$this->parser = new Mqtt\StreamParser;
232
		}
233
234 1
		$this->parser->onError(function (\Exception $ex) {
235
			$this->onError($ex, $this);
236 1
		});
237
238 1
		$this->identifierGenerator = $identifierGenerator;
239
240 1
		if ($this->identifierGenerator === NULL) {
241 1
			$this->identifierGenerator = new Mqtt\DefaultIdentifierGenerator;
0 ignored issues
show
Documentation Bug introduced by
It seems like new \BinSoul\Net\Mqtt\DefaultIdentifierGenerator() of type object<BinSoul\Net\Mqtt\...ultIdentifierGenerator> is incompatible with the declared type object<BinSoul\Net\Mqtt\IdentifierGenerator> of property $identifierGenerator.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
242
		}
243 1
	}
244
245
	/**
246
	 * {@inheritdoc}
247
	 */
248
	public function setLoop(EventLoop\LoopInterface $loop) : void
249
	{
250
		if (!$this->isConnected && !$this->isConnecting) {
251
			$this->loop = $loop;
252
253
			$this->createConnector();
254
255
		} else {
256
			throw new Exceptions\LogicException('Connection is already established. React event loop could not be changed.');
257
		}
258
	}
259
260
	/**
261
	 * {@inheritdoc}
262
	 */
263
	public function getLoop() : EventLoop\LoopInterface
264
	{
265
		return $this->loop;
266
	}
267
268
	/**
269
	 * {@inheritdoc}
270
	 */
271
	public function setConfiguration(Configuration $configuration) : void
272
	{
273
		if ($this->isConnected() || $this->isConnecting) {
274
			throw new Exceptions\InvalidStateException('Client is connecting or connected to the broker, therefore configuration could not be changed.');
275
		}
276
277
		$this->configuration = $configuration;
278
	}
279
280
	/**
281
	 * {@inheritdoc}
282
	 */
283
	public function getUri() : string
284
	{
285
		return $this->configuration->getUri();
286
	}
287
288
	/**
289
	 * {@inheritdoc}
290
	 */
291
	public function getPort() : int
292
	{
293
		return $this->configuration->getPort();
294
	}
295
296
	/**
297
	 * {@inheritdoc}
298
	 */
299
	public function isConnected() : bool
300
	{
301
		return $this->isConnected;
302
	}
303
304
	/**
305
	 * {@inheritdoc}
306
	 */
307
	public function connect() : Promise\ExtendedPromiseInterface
308
	{
309
		if ($this->isConnected || $this->isConnecting) {
310
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is already connected.'));
311
		}
312
313
		$this->onStart();
314
315
		$this->isConnecting = TRUE;
316
		$this->isConnected = FALSE;
317
318
		$connection = $this->configuration->getConnection();
319
320
		if ($connection->getClientID() === '') {
321
			$connection = $connection->withClientID($this->identifierGenerator->generateClientID());
322
		}
323
324
		$deferred = new Promise\Deferred;
325
326
		$this->establishConnection()
327
			->then(function (Socket\ConnectionInterface $stream) use ($connection, $deferred) {
328
				$this->stream = $stream;
329
330
				$this->onOpen($connection, $this);
331
332
				$this->registerClient($connection)
333
					->then(function (Mqtt\Connection $connection) use ($deferred) {
334
						$this->isConnecting = FALSE;
335
						$this->isConnected = TRUE;
336
337
						$this->connection = $connection;
338
339
						$this->onConnect($connection, $this);
340
341
						$deferred->resolve($this->connection);
342
					})
343
					->otherwise(function (\Exception $ex) use ($stream, $deferred, $connection) {
344
						$this->isConnecting = FALSE;
345
346
						$this->onError($ex, $this);
347
348
						$deferred->reject($ex);
349
350
						if ($this->stream !== NULL) {
351
							$this->stream->close();
352
						}
353
354
						$this->onClose($connection, $this);
355
					});
356
			})
357
			->otherwise(function (\Exception $ex) use ($deferred) {
358
				$this->isConnecting = FALSE;
359
360
				$this->onError($ex, $this);
361
362
				$deferred->reject($ex);
363
			});
364
365
		return $deferred->promise();
366
	}
367
368
	/**
369
	 * {@inheritdoc}
370
	 */
371
	public function disconnect() : Promise\ExtendedPromiseInterface
372
	{
373
		if (!$this->isConnected || $this->isDisconnecting) {
374
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
375
		}
376
377
		$this->isDisconnecting = TRUE;
378
379
		$deferred = new Promise\Deferred;
380
381
		$this->startFlow(new Mqtt\Flow\OutgoingDisconnectFlow($this->connection), TRUE)
382
			->then(function (Mqtt\Connection $connection) use ($deferred) {
383
				$this->isDisconnecting = FALSE;
384
				$this->isConnected = FALSE;
385
386
				$this->onDisconnect($connection, $this);
387
388
				$deferred->resolve($connection);
389
390
				if ($this->stream !== NULL) {
391
					$this->stream->close();
392
				}
393
			})
394
			->otherwise(function () use ($deferred) {
395
				$this->isDisconnecting = FALSE;
396
				$deferred->reject($this->connection);
397
			});
398
399
		return $deferred->promise();
400
	}
401
402
	/**
403
	 * {@inheritdoc}
404
	 */
405 View Code Duplication
	public function subscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
406
	{
407
		if (!$this->isConnected) {
408
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
409
		}
410
411
		return $this->startFlow(new Mqtt\Flow\OutgoingSubscribeFlow([$subscription], $this->identifierGenerator));
412
	}
413
414
	/**
415
	 * {@inheritdoc}
416
	 */
417 View Code Duplication
	public function unsubscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
418
	{
419
		if (!$this->isConnected) {
420
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
421
		}
422
423
		return $this->startFlow(new Mqtt\Flow\OutgoingUnsubscribeFlow([$subscription], $this->identifierGenerator));
424
	}
425
426
	/**
427
	 * {@inheritdoc}
428
	 */
429
	public function publish(Mqtt\Message $message) : Promise\ExtendedPromiseInterface
430
	{
431
		if (!$this->isConnected) {
432
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
433
		}
434
435
		return $this->startFlow(new Mqtt\Flow\OutgoingPublishFlow($message, $this->identifierGenerator));
436
	}
437
438
	/**
439
	 * {@inheritdoc}
440
	 */
441
	public function publishPeriodically(
442
		int $interval,
443
		Mqtt\Message $message,
444
		callable $generator
445
	) : Promise\ExtendedPromiseInterface {
446
		if (!$this->isConnected) {
447
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
448
		}
449
450
		$deferred = new Promise\Deferred;
451
452
		$this->timer[] = $this->loop->addPeriodicTimer($interval, function () use ($message, $generator, $deferred) {
453
			$this->publish($message->withPayload($generator($message->getTopic())))
454
				->then(
455
					function ($value) use ($deferred) {
456
						$deferred->notify($value);
457
					},
458
					function (\Exception $e) use ($deferred) {
459
						$deferred->reject($e);
460
					}
461
				);
462
		});
463
464
		return $deferred->promise();
465
	}
466
467
	/**
468
	 * Establishes a network connection to a server
469
	 *
470
	 * @return Promise\ExtendedPromiseInterface
471
	 */
472
	private function establishConnection() : Promise\ExtendedPromiseInterface
473
	{
474
		$deferred = new Promise\Deferred;
475
476
		$timer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
477
			$exception = new Exceptions\RuntimeException(sprintf('Connection timed out after %d seconds.', $this->timeout));
478
479
			$deferred->reject($exception);
480
		});
481
482
		$this->connector->connect($this->configuration->getUri())
483
			->always(function () use ($timer) {
484
				$this->loop->cancelTimer($timer);
485
			})
486
			->then(function (Stream\DuplexStreamInterface $stream) use ($deferred) {
487
				$stream->on('data', function ($data) {
488
					$this->handleReceive($data);
489
				});
490
491
				$stream->on('close', function () {
492
					$this->handleClose();
493
				});
494
495
				$stream->on('error', function (\Exception $ex) {
496
					$this->handleError($ex);
497
				});
498
499
				$deferred->resolve($stream);
500
			})
501
			->otherwise(function (\Exception $ex) use ($deferred) {
502
				$deferred->reject($ex);
503
			});
504
505
		return $deferred->promise();
506
	}
507
508
	/**
509
	 * Registers a new client with the broker
510
	 *
511
	 * @param Mqtt\Connection $connection
512
	 *
513
	 * @return Promise\ExtendedPromiseInterface
514
	 */
515
	private function registerClient(Mqtt\Connection $connection) : Promise\ExtendedPromiseInterface
516
	{
517
		$deferred = new Promise\Deferred;
518
519
		$responseTimer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
520
			$exception = new Exceptions\RuntimeException(sprintf('No response after %d seconds.', $this->timeout));
521
522
			$deferred->reject($exception);
523
		});
524
525
		$this->startFlow(new Mqtt\Flow\OutgoingConnectFlow($connection, $this->identifierGenerator), TRUE)
526
			->always(function () use ($responseTimer) {
527
				$this->loop->cancelTimer($responseTimer);
528
529
			})->then(function (Mqtt\Connection $connection) use ($deferred) {
530
				$this->timer[] = $this->loop->addPeriodicTimer(floor($connection->getKeepAlive() * 0.75), function () {
531
					$this->startFlow(new Mqtt\Flow\OutgoingPingFlow);
532
				});
533
534
				$deferred->resolve($connection);
535
536
			})->otherwise(function (\Exception $ex) use ($deferred) {
537
				$deferred->reject($ex);
538
			});
539
540
		return $deferred->promise();
541
	}
542
543
	/**
544
	 * Handles incoming data
545
	 *
546
	 * @param string $data
547
	 *
548
	 * @return void
549
	 */
550
	private function handleReceive(string $data) : void
551
	{
552
		if (!$this->isConnected && !$this->isConnecting) {
553
			return;
554
		}
555
556
		$flowCount = count($this->receivingFlows);
557
558
		$packets = $this->parser->push($data);
559
560
		foreach ($packets as $packet) {
561
			$this->handlePacket($packet);
562
		}
563
564
		if ($flowCount > count($this->receivingFlows)) {
565
			$this->receivingFlows = array_values($this->receivingFlows);
566
		}
567
	}
568
569
	/**
570
	 * Handles an incoming packet
571
	 *
572
	 * @param Mqtt\Packet $packet
573
	 *
574
	 * @return void
575
	 */
576
	private function handlePacket(Mqtt\Packet $packet) : void
577
	{
578
		switch ($packet->getPacketType()) {
579
			case Mqtt\Packet::TYPE_PUBLISH:
580
				/* @var Mqtt\Packet\PublishRequestPacket $packet */
581
				$message = new Mqtt\DefaultMessage(
582
					$packet->getTopic(),
583
					$packet->getPayload(),
584
					$packet->getQosLevel(),
585
					$packet->isRetained(),
586
					$packet->isDuplicate()
587
				);
588
589
				$this->startFlow(new Mqtt\Flow\IncomingPublishFlow($message, $packet->getIdentifier()));
590
				break;
591
592
			case Mqtt\Packet::TYPE_CONNACK:
593
			case Mqtt\Packet::TYPE_PINGRESP:
594
			case Mqtt\Packet::TYPE_SUBACK:
595
			case Mqtt\Packet::TYPE_UNSUBACK:
596
			case Mqtt\Packet::TYPE_PUBREL:
597
			case Mqtt\Packet::TYPE_PUBACK:
598
			case Mqtt\Packet::TYPE_PUBREC:
599
			case Mqtt\Packet::TYPE_PUBCOMP:
600
				$flowFound = FALSE;
601
602
				foreach ($this->receivingFlows as $index => $flow) {
603
					if ($flow->accept($packet)) {
604
						$flowFound = TRUE;
605
606
						unset($this->receivingFlows[$index]);
607
						$this->continueFlow($flow, $packet);
608
609
						break;
610
					}
611
				}
612
613
				if (!$flowFound) {
614
					$ex = new Exceptions\LogicException(sprintf('Received unexpected packet of type %d.', $packet->getPacketType()));
615
616
					$this->onWarning($ex, $this);
617
				}
618
				break;
619
620
			default:
621
				$ex = new Exceptions\LogicException(sprintf('Cannot handle packet of type %d.', $packet->getPacketType()));
622
623
				$this->onWarning($ex, $this);
624
		}
625
	}
626
627
	/**
628
	 * Handles outgoing packets
629
	 *
630
	 * @return void
631
	 */
632
	private function handleSend() : void
633
	{
634
		$flow = NULL;
635
636
		if ($this->writtenFlow !== NULL) {
637
			$flow = $this->writtenFlow;
638
			$this->writtenFlow = NULL;
639
		}
640
641
		if (count($this->sendingFlows) > 0) {
642
			$this->writtenFlow = array_shift($this->sendingFlows);
643
			$this->stream->write($this->writtenFlow->getPacket());
644
		}
645
646
		if ($flow !== NULL) {
647
			if ($flow->isFinished()) {
648
				$this->loop->nextTick(function () use ($flow) {
649
					$this->finishFlow($flow);
650
				});
651
652
			} else {
653
				$this->receivingFlows[] = $flow;
654
			}
655
		}
656
	}
657
658
	/**
659
	 * Handles closing of the stream
660
	 *
661
	 * @return void
662
	 */
663
	private function handleClose() : void
664
	{
665
		foreach ($this->timer as $timer) {
666
			$this->loop->cancelTimer($timer);
667
		}
668
669
		$connection = $this->connection;
670
671
		$this->isConnecting = FALSE;
672
		$this->isDisconnecting = FALSE;
673
		$this->isConnected = FALSE;
674
		$this->connection = NULL;
675
		$this->stream = NULL;
676
677
		if ($connection !== NULL) {
678
			$this->onClose($connection, $this);
679
		}
680
	}
681
682
	/**
683
	 * Handles errors of the stream
684
	 *
685
	 * @param \Exception $ex
686
	 *
687
	 * @return void
688
	 */
689
	private function handleError(\Exception $ex) : void
690
	{
691
		$this->onError($ex, $this);
692
	}
693
694
	/**
695
	 * Starts the given flow
696
	 *
697
	 * @param Mqtt\Flow $flow
698
	 * @param bool $isSilent
699
	 *
700
	 * @return Promise\ExtendedPromiseInterface
701
	 */
702
	private function startFlow(Mqtt\Flow $flow, bool $isSilent = FALSE) : Promise\ExtendedPromiseInterface
703
	{
704
		try {
705
			$packet = $flow->start();
706
707
		} catch (\Exception $ex) {
708
			$this->onError($ex, $this);
709
710
			return new Promise\RejectedPromise($ex);
711
		}
712
713
		$deferred = new Promise\Deferred;
714
		$internalFlow = new Flow\Envelope($flow, $deferred, $packet, $isSilent);
715
716 View Code Duplication
		if ($packet !== NULL) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
717
			if ($this->writtenFlow !== NULL) {
718
				$this->sendingFlows[] = $internalFlow;
719
720
			} else {
721
				$this->stream->write($packet);
722
				$this->writtenFlow = $internalFlow;
723
724
				$this->handleSend();
725
			}
726
727
		} else {
728
			$this->loop->nextTick(function () use ($internalFlow) {
729
				$this->finishFlow($internalFlow);
730
			});
731
		}
732
733
		return $deferred->promise();
734
	}
735
736
	/**
737
	 * Continues the given flow
738
	 *
739
	 * @param Flow\Envelope $flow
740
	 * @param Mqtt\Packet $packet
741
	 *
742
	 * @return void
743
	 */
744
	private function continueFlow(Flow\Envelope $flow, Mqtt\Packet $packet) : void
745
	{
746
		try {
747
			$response = $flow->next($packet);
748
749
		} catch (\Exception $ex) {
750
			$this->onError($ex, $this);
751
752
			return;
753
		}
754
755 View Code Duplication
		if ($response !== NULL) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
756
			if ($this->writtenFlow !== NULL) {
757
				$this->sendingFlows[] = $flow;
758
759
			} else {
760
				$this->stream->write($response);
761
				$this->writtenFlow = $flow;
762
763
				$this->handleSend();
764
			}
765
766
		} elseif ($flow->isFinished()) {
767
			$this->loop->nextTick(function () use ($flow) {
768
				$this->finishFlow($flow);
769
			});
770
		}
771
	}
772
773
	/**
774
	 * Finishes the given flow
775
	 *
776
	 * @param Flow\Envelope $flow
777
	 *
778
	 * @return void
779
	 */
780
	private function finishFlow(Flow\Envelope $flow) : void
781
	{
782
		if ($flow->isSuccess()) {
783
			if (!$flow->isSilent()) {
784
				switch ($flow->getCode()) {
785
					case 'ping':
786
						$this->onPing($this);
787
						break;
788
789
					case 'pong':
790
						$this->onPong($this);
791
						break;
792
793
					case 'connect':
794
						$this->onConnect($flow->getResult(), $this);
795
						break;
796
797
					case 'disconnect':
798
						$this->onDisconnect($flow->getResult(), $this);
799
						break;
800
801
					case 'publish':
802
						$this->onPublish($flow->getResult(), $this);
803
						break;
804
805
					case 'subscribe':
806
						$this->onSubscribe($flow->getResult(), $this);
807
						break;
808
809
					case 'unsubscribe':
810
						$this->onUnsubscribe($flow->getResult(), $this);
811
						break;
812
813
					case 'message':
814
						$this->onMessage($flow->getResult(), $this);
815
						break;
816
				}
817
			}
818
819
			$flow->getDeferred()->resolve($flow->getResult());
820
821
		} else {
822
			$ex = new Exceptions\RuntimeException($flow->getErrorMessage());
823
			$this->onWarning($ex, $this);
824
825
			$flow->getDeferred()->reject($ex);
826
		}
827
	}
828
829
	/**
830
	 * @return void
831
	 */
832
	private function createConnector() : void
833
	{
834 1
		$this->connector = new Socket\TcpConnector($this->loop);
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\TcpConnector($this->loop) of type object<React\Socket\TcpConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
835
836 1
		if ($this->configuration->isDNSEnabled()) {
837 1
			$dnsResolverFactory = new Dns\Resolver\Factory;
838
839 1
			$this->connector = new Socket\DnsConnector($this->connector, $dnsResolverFactory->createCached($this->configuration->getDNSAddress(), $this->loop));
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\DnsCon...ddress(), $this->loop)) of type object<React\Socket\DnsConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
840
		}
841
842 1
		if ($this->configuration->isSSLEnabled()) {
843
			$this->connector = new Socket\SecureConnector($this->connector, $this->loop, $this->configuration->getSSLConfiguration());
0 ignored issues
show
Documentation Bug introduced by
It seems like new \React\Socket\Secure...>getSSLConfiguration()) of type object<React\Socket\SecureConnector> is incompatible with the declared type object<React\Socket\ConnectorInterface> of property $connector.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
844
		}
845 1
	}
846
}
847