Completed
Pull Request — master (#6)
by Adam
07:08
created

Client::createConnector()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 14
ccs 7
cts 7
cp 1
rs 9.7998
c 0
b 0
f 0
cc 3
nc 4
nop 0
crap 3
1
<?php
2
/**
3
 * Client.php
4
 *
5
 * @copyright      More in license.md
6
 * @license        http://www.ipublikuj.eu
7
 * @author         Adam Kadlec http://www.ipublikuj.eu
8
 * @package        iPublikuj:MQTTClient!
9
 * @subpackage     Client
10
 * @since          1.0.0
11
 *
12
 * @date           12.03.17
13
 */
14
15
declare(strict_types = 1);
16
17
namespace IPub\MQTTClient\Client;
18
19
use Closure;
20
21
use Nette;
22
23
use React\EventLoop;
24
use React\Dns;
25
use React\Promise;
26
use React\Socket;
27
use React\Stream;
28
29
use BinSoul\Net\Mqtt;
30
31
use IPub\MQTTClient\Configuration;
32
use IPub\MQTTClient\Exceptions;
33
use IPub\MQTTClient\Flow;
34
35
/**
36
 * Connection client
37
 *
38
 * @package        iPublikuj:MQTTClient!
39
 * @subpackage     Client
40
 *
41
 * @author         Adam Kadlec <[email protected]>
42
 *
43
 * @method onStart()
44
 * @method onOpen(Mqtt\Connection $connection, IClient $client)
45
 * @method onConnect(Mqtt\Connection $connection, IClient $client)
46
 * @method onDisconnect(Mqtt\Connection $connection, IClient $client)
47
 * @method onClose(Mqtt\Connection $connection, IClient $client)
48
 * @method onPing(IClient $client)
49
 * @method onPong(IClient $client)
50
 * @method onPublish(Mqtt\Message $message, IClient $client)
51
 * @method onSubscribe(Mqtt\Subscription $subscription, IClient $client)
52
 * @method onUnsubscribe(Mqtt\Subscription $subscription, IClient $client)
53
 * @method onMessage(Mqtt\Message $message, IClient $client)
54
 * @method onWarning(\Exception $ex, IClient $client)
55
 * @method onError(\Exception $ex, IClient $client)
56
 */
57 1
final class Client implements IClient
58
{
59
	/**
60
	 * Implement nette smart magic
61
	 */
62 1
	use Nette\SmartObject;
63
64
	/**
65
	 * @var Closure
66
	 */
67
	public $onStart = [];
68
69
	/**
70
	 * @var Closure
71
	 */
72
	public $onOpen = [];
73
74
	/**
75
	 * @var Closure
76
	 */
77
	public $onConnect = [];
78
79
	/**
80
	 * @var Closure
81
	 */
82
	public $onDisconnect = [];
83
84
	/**
85
	 * @var Closure
86
	 */
87
	public $onClose = [];
88
89
	/**
90
	 * @var Closure
91
	 */
92
	public $onPing = [];
93
94
	/**
95
	 * @var Closure
96
	 */
97
	public $onPong = [];
98
99
	/**
100
	 * @var Closure
101
	 */
102
	public $onPublish = [];
103
104
	/**
105
	 * @var Closure
106
	 */
107
	public $onSubscribe = [];
108
109
	/**
110
	 * @var Closure
111
	 */
112
	public $onUnsubscribe = [];
113
114
	/**
115
	 * @var Closure
116
	 */
117
	public $onMessage = [];
118
119
	/**
120
	 * @var Closure
121
	 */
122
	public $onWarning = [];
123
124
	/**
125
	 * @var Closure
126
	 */
127
	public $onError = [];
128
129
	/**
130
	 * @var EventLoop\LoopInterface
131
	 */
132
	private $loop;
133
134
	/**
135
	 * @var Configuration\Broker
136
	 */
137
	private $configuration;
138
139
	/**
140
	 * @var Socket\ConnectorInterface
141
	 */
142
	private $connector;
143
144
	/**
145
	 * @var Socket\ConnectionInterface|NULL
146
	 */
147
	private $stream = NULL;
148
149
	/**
150
	 * @var Mqtt\Connection
151
	 */
152
	private $connection;
153
154
	/**
155
	 * @var Mqtt\StreamParser
156
	 */
157
	private $parser;
158
159
	/**
160
	 * @var Mqtt\IdentifierGenerator
161
	 */
162
	private $identifierGenerator;
163
164
	/**
165
	 * @var bool
166
	 */
167
	private $isConnected = FALSE;
168
169
	/**
170
	 * @var bool
171
	 */
172
	private $isConnecting = FALSE;
173
174
	/**
175
	 * @var bool
176
	 */
177
	private $isDisconnecting = FALSE;
178
179
	/**
180
	 * @var string
181
	 */
182
	private $host;
183
184
	/**
185
	 * @var int
186
	 */
187
	private $port;
188
189
	/**
190
	 * @var int
191
	 */
192
	private $timeout = 5;
193
194
	/**
195
	 * @var Flow\Envelope[]
196
	 */
197
	private $receivingFlows = [];
198
199
	/**
200
	 * @var Flow\Envelope[]
201
	 */
202
	private $sendingFlows = [];
203
204
	/**
205
	 * @var Flow\Envelope
206
	 */
207
	private $writtenFlow;
208
209
	/**
210
	 * @var EventLoop\TimerInterface[]
211
	 */
212
	private $timer = [];
213
214
	/**
215
	 * @var Mqtt\Message|NULL
216
	 */
217
	private $lastMessage = NULL;
218
219
	/**
220
	 * @param EventLoop\LoopInterface $eventLoop
221
	 * @param Configuration\Broker $configuration
222
	 * @param Mqtt\IdentifierGenerator|NULL $identifierGenerator
223
	 * @param Mqtt\StreamParser|NULL $parser
224
	 */
225
	public function __construct(
226
		EventLoop\LoopInterface $eventLoop,
227
		Configuration\Broker $configuration,
228
		Mqtt\IdentifierGenerator $identifierGenerator = NULL,
229
		Mqtt\StreamParser $parser = NULL
230
	) {
231 1
		$this->loop = $eventLoop;
232 1
		$this->configuration = $configuration;
233
234 1
		$this->createConnector();
235
236 1
		$this->parser = $parser;
237
238 1
		if ($this->parser === NULL) {
239 1
			$this->parser = new Mqtt\StreamParser;
240
		}
241
242 1
		$this->parser->onError(function (\Exception $ex) {
243
			$this->onError($ex, $this);
244 1
		});
245
246 1
		$this->identifierGenerator = $identifierGenerator;
247
248 1
		if ($this->identifierGenerator === NULL) {
249 1
			$this->identifierGenerator = new Mqtt\DefaultIdentifierGenerator;
250
		}
251 1
	}
252
253
	/**
254
	 * {@inheritdoc}
255
	 */
256
	public function setLoop(EventLoop\LoopInterface $loop) : void
257
	{
258
		if (!$this->isConnected && !$this->isConnecting) {
259
			$this->loop = $loop;
260
261
			$this->createConnector();
262
263
		} else {
264
			throw new Exceptions\LogicException('Connection is already established. React event loop could not be changed.');
265
		}
266
	}
267
268
	/**
269
	 * {@inheritdoc}
270
	 */
271
	public function getLoop() : EventLoop\LoopInterface
272
	{
273
		return $this->loop;
274
	}
275
276
	/**
277
	 * {@inheritdoc}
278
	 *
279
	 * @throws Exceptions\InvalidStateException
280
	 */
281
	public function setConfiguration(Configuration\Broker $configuration) : void
282
	{
283
		if ($this->isConnected() || $this->isConnecting) {
284
			throw new Exceptions\InvalidStateException('Client is connecting or connected to the broker, therefore configuration could not be changed.');
285
		}
286
287
		$this->configuration = $configuration;
288
	}
289
290
	/**
291
	 * {@inheritdoc}
292
	 *
293
	 * @throws Exceptions\InvalidStateException
294
	 */
295
	public function getUri() : string
296
	{
297
		return $this->configuration->getUri();
298
	}
299
300
	/**
301
	 * {@inheritdoc}
302
	 */
303
	public function getPort() : int
304
	{
305
		return $this->configuration->getPort();
306
	}
307
308
	/**
309
	 * {@inheritdoc}
310
	 */
311
	public function isConnected() : bool
312
	{
313
		return $this->isConnected;
314
	}
315
316
	/**
317
	 * {@inheritdoc}
318
	 */
319
	public function connect() : Promise\ExtendedPromiseInterface
320
	{
321
		if ($this->isConnected || $this->isConnecting) {
322
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is already connected.'));
323
		}
324
325
		$this->onStart();
326
327
		$this->isConnecting = TRUE;
328
		$this->isConnected = FALSE;
329
330
		$connection = $this->configuration->getConnection();
331
332
		if ($connection->getClientID() === '') {
333
			$connection->setClientID($this->identifierGenerator->generateClientID());
334
		}
335
336
		$deferred = new Promise\Deferred;
337
338
		$this->establishConnection()
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface React\Promise\PromiseInterface as the method otherwise() does only exist in the following implementations of said interface: React\Promise\FulfilledPromise, React\Promise\LazyPromise, React\Promise\Promise, React\Promise\RejectedPromise.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
339
			->then(function (Socket\ConnectionInterface $stream) use ($connection, $deferred) {
340
				$this->stream = $stream;
341
342
				$this->onOpen($connection, $this);
343
344
				$this->registerClient($connection)
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface React\Promise\PromiseInterface as the method otherwise() does only exist in the following implementations of said interface: React\Promise\FulfilledPromise, React\Promise\LazyPromise, React\Promise\Promise, React\Promise\RejectedPromise.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
345
					->then(function (Mqtt\Connection $connection) use ($deferred) {
346
						$this->isConnecting = FALSE;
347
						$this->isConnected = TRUE;
348
349
						$this->connection = $connection;
350
351
						$this->onConnect($connection, $this);
352
353
						$deferred->resolve($this->connection);
354
					})
355
					->otherwise(function (\Exception $ex) use ($stream, $deferred, $connection) {
356
						$this->isConnecting = FALSE;
357
358
						$this->onError($ex, $this);
359
360
						$deferred->reject($ex);
361
362
						if ($this->stream !== NULL) {
363
							$this->stream->close();
364
						}
365
366
						$this->onClose($connection, $this);
367
					});
368
			})
369
			->otherwise(function (\Exception $ex) use ($deferred) {
370
				$this->isConnecting = FALSE;
371
372
				$this->onError($ex, $this);
373
374
				$deferred->reject($ex);
375
			});
376
377
		return $deferred->promise();
378
	}
379
380
	/**
381
	 * {@inheritdoc}
382
	 */
383
	public function disconnect() : Promise\ExtendedPromiseInterface
384
	{
385
		if (!$this->isConnected || $this->isDisconnecting) {
386
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
387
		}
388
389
		$this->isDisconnecting = TRUE;
390
391
		$deferred = new Promise\Deferred;
392
393
		$this->startFlow(new Mqtt\Flow\OutgoingDisconnectFlow($this->connection), TRUE)
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface React\Promise\PromiseInterface as the method otherwise() does only exist in the following implementations of said interface: React\Promise\FulfilledPromise, React\Promise\LazyPromise, React\Promise\Promise, React\Promise\RejectedPromise.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
394
			->then(function (Mqtt\Connection $connection) use ($deferred) {
395
				$this->isDisconnecting = FALSE;
396
				$this->isConnected = FALSE;
397
398
				$this->onDisconnect($connection, $this);
399
400
				$deferred->resolve($connection);
401
402
				if ($this->stream !== NULL) {
403
					$this->stream->close();
404
				}
405
			})
406
			->otherwise(function () use ($deferred) {
407
				$this->isDisconnecting = FALSE;
408
				$deferred->reject($this->connection);
409
			});
410
411
		return $deferred->promise();
412
	}
413
414
	/**
415
	 * {@inheritdoc}
416
	 */
417 View Code Duplication
	public function subscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
418
	{
419
		if (!$this->isConnected) {
420
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
421
		}
422
423
		return $this->startFlow(new Mqtt\Flow\OutgoingSubscribeFlow([$subscription], $this->identifierGenerator));
424
	}
425
426
	/**
427
	 * {@inheritdoc}
428
	 */
429 View Code Duplication
	public function unsubscribe(Mqtt\Subscription $subscription) : Promise\ExtendedPromiseInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
430
	{
431
		if (!$this->isConnected) {
432
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
433
		}
434
435
		return $this->startFlow(new Mqtt\Flow\OutgoingUnsubscribeFlow([$subscription], $this->identifierGenerator));
436
	}
437
438
	/**
439
	 * {@inheritdoc}
440
	 */
441
	public function publish(Mqtt\Message $message) : Promise\ExtendedPromiseInterface
442
	{
443
		if (!$this->isConnected) {
444
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
445
		}
446
447
		return $this->startFlow(new Mqtt\Flow\OutgoingPublishFlow($message, $this->identifierGenerator));
448
	}
449
450
	/**
451
	 * {@inheritdoc}
452
	 */
453
	public function publishPeriodically(
454
		int $interval,
455
		Mqtt\Message $message,
456
		callable $generator
457
	) : Promise\ExtendedPromiseInterface {
458
		if (!$this->isConnected) {
459
			return new Promise\RejectedPromise(new Exceptions\LogicException('The client is not connected.'));
460
		}
461
462
		$deferred = new Promise\Deferred;
463
464
		$this->timer[] = $this->loop->addPeriodicTimer($interval, function () use ($message, $generator, $deferred) {
465
			$this->publish($message->withPayload($generator($message->getTopic())))
466
				->then(
467
					function ($value) use ($deferred) {
468
						$deferred->notify($value);
0 ignored issues
show
Deprecated Code introduced by
The method React\Promise\Deferred::notify() has been deprecated with message: 2.6.0 Progress support is deprecated and should not be used anymore.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
469
					},
470
					function (\Exception $e) use ($deferred) {
471
						$deferred->reject($e);
472
					}
473
				);
474
		});
475
476
		return $deferred->promise();
477
	}
478
479
	/**
480
	 * Establishes a network connection to a server
481
	 *
482
	 * @return Promise\ExtendedPromiseInterface
483
	 *
484
	 * @throws Exceptions\InvalidStateException
485
	 */
486
	private function establishConnection() : Promise\ExtendedPromiseInterface
487
	{
488
		$deferred = new Promise\Deferred;
489
490
		$timer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
491
			$exception = new Exceptions\RuntimeException(sprintf('Connection timed out after %d seconds.', $this->timeout));
492
493
			$deferred->reject($exception);
494
		});
495
496
		$this->connector->connect($this->configuration->getUri())
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface React\Promise\PromiseInterface as the method always() does only exist in the following implementations of said interface: React\Promise\FulfilledPromise, React\Promise\LazyPromise, React\Promise\Promise, React\Promise\RejectedPromise.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
497
			->always(function () use ($timer) {
498
				$this->loop->cancelTimer($timer);
499
			})
500
			->then(function (Stream\DuplexStreamInterface $stream) use ($deferred) {
501
				$stream->on('data', function ($data) {
502
					$this->handleReceive($data);
503
				});
504
505
				$stream->on('close', function () {
506
					$this->handleClose();
507
				});
508
509
				$stream->on('error', function (\Exception $ex) {
510
					$this->handleError($ex);
511
				});
512
513
				$deferred->resolve($stream);
514
			})
515
			->otherwise(function (\Exception $ex) use ($deferred) {
516
				$deferred->reject($ex);
517
			});
518
519
		return $deferred->promise();
520
	}
521
522
	/**
523
	 * Registers a new client with the broker
524
	 *
525
	 * @param Mqtt\Connection $connection
526
	 *
527
	 * @return Promise\ExtendedPromiseInterface
528
	 */
529
	private function registerClient(Mqtt\Connection $connection) : Promise\ExtendedPromiseInterface
530
	{
531
		$deferred = new Promise\Deferred;
532
533
		$responseTimer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
534
			$exception = new Exceptions\RuntimeException(sprintf('No response after %d seconds.', $this->timeout));
535
536
			$deferred->reject($exception);
537
		});
538
539
		$this->startFlow(new Mqtt\Flow\OutgoingConnectFlow($connection, $this->identifierGenerator), TRUE)
540
			->always(function () use ($responseTimer) {
541
				$this->loop->cancelTimer($responseTimer);
542
543
			})->then(function (Mqtt\Connection $connection) use ($deferred) {
544
				$this->timer[] = $this->loop->addPeriodicTimer(floor($connection->getKeepAlive() * 0.75), function () {
545
					$this->startFlow(new Mqtt\Flow\OutgoingPingFlow);
546
				});
547
548
				$deferred->resolve($connection);
549
550
			})->otherwise(function (\Exception $ex) use ($deferred) {
551
				$deferred->reject($ex);
552
			});
553
554
		return $deferred->promise();
555
	}
556
557
	/**
558
	 * Handles incoming data
559
	 *
560
	 * @param string $data
561
	 *
562
	 * @return void
563
	 */
564
	private function handleReceive(string $data) : void
565
	{
566
		if (!$this->isConnected && !$this->isConnecting) {
567
			return;
568
		}
569
570
		$flowCount = count($this->receivingFlows);
571
572
		$packets = $this->parser->push($data);
573
574
		foreach ($packets as $packet) {
575
			$this->handlePacket($packet);
576
		}
577
578
		if ($flowCount > count($this->receivingFlows)) {
579
			$this->receivingFlows = array_values($this->receivingFlows);
580
		}
581
	}
582
583
	/**
584
	 * Handles an incoming packet
585
	 *
586
	 * @param Mqtt\Packet $packet
587
	 *
588
	 * @return void
589
	 */
590
	private function handlePacket(Mqtt\Packet $packet) : void
591
	{
592
		switch ($packet->getPacketType()) {
593
			case Mqtt\Packet::TYPE_PUBLISH:
594
				/* @var Mqtt\Packet\PublishRequestPacket $packet */
595
				$message = new Mqtt\DefaultMessage(
596
					$packet->getTopic(),
597
					$packet->getPayload(),
598
					$packet->getQosLevel(),
599
					$packet->isRetained(),
600
					$packet->isDuplicate()
601
				);
602
603
				$this->startFlow(new Mqtt\Flow\IncomingPublishFlow($message, $packet->getIdentifier()));
604
				break;
605
606
			case Mqtt\Packet::TYPE_CONNACK:
607
			case Mqtt\Packet::TYPE_PINGRESP:
608
			case Mqtt\Packet::TYPE_SUBACK:
609
			case Mqtt\Packet::TYPE_UNSUBACK:
610
			case Mqtt\Packet::TYPE_PUBREL:
611
			case Mqtt\Packet::TYPE_PUBACK:
612
			case Mqtt\Packet::TYPE_PUBREC:
613
			case Mqtt\Packet::TYPE_PUBCOMP:
614
				$flowFound = FALSE;
615
616
				foreach ($this->receivingFlows as $index => $flow) {
617
					if ($flow->accept($packet)) {
618
						$flowFound = TRUE;
619
620
						unset($this->receivingFlows[$index]);
621
						$this->continueFlow($flow, $packet);
622
623
						break;
624
					}
625
				}
626
627
				if (!$flowFound) {
628
					$ex = new Exceptions\LogicException(sprintf('Received unexpected packet of type %d.', $packet->getPacketType()));
629
630
					$this->onWarning($ex, $this);
631
				}
632
				break;
633
634
			default:
635
				$ex = new Exceptions\LogicException(sprintf('Cannot handle packet of type %d.', $packet->getPacketType()));
636
637
				$this->onWarning($ex, $this);
638
		}
639
	}
640
641
	/**
642
	 * Handles outgoing packets
643
	 *
644
	 * @return void
645
	 */
646
	private function handleSend() : void
647
	{
648
		$flow = NULL;
649
650
		if ($this->writtenFlow !== NULL) {
651
			$flow = $this->writtenFlow;
652
			$this->writtenFlow = NULL;
653
		}
654
655
		if (count($this->sendingFlows) > 0) {
656
			$this->writtenFlow = array_shift($this->sendingFlows);
657
			$this->stream->write($this->writtenFlow->getPacket());
658
		}
659
660
		if ($flow !== NULL) {
661
			if ($flow->isFinished()) {
662
				$this->loop->futureTick(function () use ($flow) {
663
					$this->finishFlow($flow);
664
				});
665
666
			} else {
667
				$this->receivingFlows[] = $flow;
668
			}
669
		}
670
	}
671
672
	/**
673
	 * Handles closing of the stream
674
	 *
675
	 * @return void
676
	 */
677
	private function handleClose() : void
678
	{
679
		foreach ($this->timer as $timer) {
680
			$this->loop->cancelTimer($timer);
681
		}
682
683
		$connection = $this->connection;
684
685
		$this->isConnecting = FALSE;
686
		$this->isDisconnecting = FALSE;
687
		$this->isConnected = FALSE;
688
		$this->connection = NULL;
689
		$this->stream = NULL;
690
691
		if ($connection !== NULL) {
692
			$this->onClose($connection, $this);
693
		}
694
	}
695
696
	/**
697
	 * Handles errors of the stream
698
	 *
699
	 * @param \Exception $ex
700
	 *
701
	 * @return void
702
	 */
703
	private function handleError(\Exception $ex) : void
704
	{
705
		$this->onError($ex, $this);
706
	}
707
708
	/**
709
	 * Starts the given flow
710
	 *
711
	 * @param Mqtt\Flow $flow
712
	 * @param bool $isSilent
713
	 *
714
	 * @return Promise\ExtendedPromiseInterface
715
	 */
716
	private function startFlow(Mqtt\Flow $flow, bool $isSilent = FALSE) : Promise\ExtendedPromiseInterface
717
	{
718
		try {
719
			$packet = $flow->start();
720
721
		} catch (\Exception $ex) {
722
			$this->onError($ex, $this);
723
724
			return new Promise\RejectedPromise($ex);
725
		}
726
727
		$deferred = new Promise\Deferred;
728
		$internalFlow = new Flow\Envelope($flow, $deferred, $packet, $isSilent);
729
730 View Code Duplication
		if ($packet !== NULL) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
731
			if ($this->writtenFlow !== NULL) {
732
				$this->sendingFlows[] = $internalFlow;
733
734
			} else {
735
				$this->stream->write($packet);
736
				$this->writtenFlow = $internalFlow;
737
738
				$this->handleSend();
739
			}
740
741
		} else {
742
			$this->loop->futureTick(function () use ($internalFlow) {
743
				$this->finishFlow($internalFlow);
744
			});
745
		}
746
747
		return $deferred->promise();
748
	}
749
750
	/**
751
	 * Continues the given flow
752
	 *
753
	 * @param Flow\Envelope $flow
754
	 * @param Mqtt\Packet $packet
755
	 *
756
	 * @return void
757
	 */
758
	private function continueFlow(Flow\Envelope $flow, Mqtt\Packet $packet) : void
759
	{
760
		try {
761
			$response = $flow->next($packet);
762
763
		} catch (\Exception $ex) {
764
			$this->onError($ex, $this);
765
766
			return;
767
		}
768
769 View Code Duplication
		if ($response !== NULL) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
770
			if ($this->writtenFlow !== NULL) {
771
				$this->sendingFlows[] = $flow;
772
773
			} else {
774
				$this->stream->write($response);
775
				$this->writtenFlow = $flow;
776
777
				$this->handleSend();
778
			}
779
780
		} elseif ($flow->isFinished()) {
781
			$this->loop->futureTick(function () use ($flow) {
782
				$this->finishFlow($flow);
783
			});
784
		}
785
	}
786
787
	/**
788
	 * Finishes the given flow
789
	 *
790
	 * @param Flow\Envelope $flow
791
	 *
792
	 * @return void
793
	 */
794
	private function finishFlow(Flow\Envelope $flow) : void
795
	{
796
		if ($flow->isSuccess()) {
797
			if (!$flow->isSilent()) {
798
				switch ($flow->getCode()) {
799
					case 'ping':
800
						$this->onPing($this);
801
						break;
802
803
					case 'pong':
804
						$this->onPong($this);
805
						break;
806
807
					case 'connect':
808
						$this->onConnect($flow->getResult(), $this);
809
						break;
810
811
					case 'disconnect':
812
						$this->onDisconnect($flow->getResult(), $this);
813
						break;
814
815
					case 'publish':
816
						$this->onPublish($flow->getResult(), $this);
817
						break;
818
819
					case 'subscribe':
820
						$this->onSubscribe($flow->getResult(), $this);
821
						break;
822
823
					case 'unsubscribe':
824
						$this->onUnsubscribe($flow->getResult(), $this);
825
						break;
826
827
					case 'message':
828
						/** @var Mqtt\Message $message */
829
						$message = $flow->getResult();
830
831
						if ($this->lastMessage === NULL) {
832
							$this->lastMessage = $flow->getResult();
833
834
						} elseif (
835
							$this->lastMessage->getTopic() === $message->getTopic()
836
							&& $this->lastMessage->getPayload() === $message->getPayload()
837
						) {
838
							break;
839
						}
840
841
						$this->lastMessage = $message;
842
843
						$this->onMessage($message, $this);
844
						break;
845
				}
846
			}
847
848
			$flow->getDeferred()->resolve($flow->getResult());
849
850
		} else {
851
			$ex = new Exceptions\RuntimeException($flow->getErrorMessage());
852
			$this->onWarning($ex, $this);
853
854
			$flow->getDeferred()->reject($ex);
855
		}
856
	}
857
858
	/**
859
	 * @return void
860
	 */
861
	private function createConnector() : void
862
	{
863 1
		$this->connector = new Socket\TcpConnector($this->loop);
864
865 1
		if ($this->configuration->isDNSEnabled()) {
866 1
			$dnsResolverFactory = new Dns\Resolver\Factory;
867
868 1
			$this->connector = new Socket\DnsConnector($this->connector, $dnsResolverFactory->createCached($this->configuration->getDNSAddress(), $this->loop));
0 ignored issues
show
Compatibility introduced by
$dnsResolverFactory->cre...Address(), $this->loop) of type object<React\Dns\Resolver\ResolverInterface> is not a sub-type of object<React\Dns\Resolver\Resolver>. It seems like you assume a concrete implementation of the interface React\Dns\Resolver\ResolverInterface to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
869
		}
870
871 1
		if ($this->configuration->isSSLEnabled()) {
872 1
			$this->connector = new Socket\SecureConnector($this->connector, $this->loop, $this->configuration->getSSLConfiguration());
873
		}
874 1
	}
875
}
876