Client   A
last analyzed

Complexity

Total Complexity 31

Size/Duplication

Total Lines 351
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 31
eloc 114
dl 0
loc 351
rs 9.92
c 2
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A checkAndReturnAnswer() 0 10 2
A shutdownConnection() 0 3 1
A checkForConnectionErrors() 0 12 3
A __destruct() 0 6 2
A readBrokerHeader() 0 4 1
A readBrokerData() 0 7 1
A sendBrokerData() 0 22 3
A updateLastCommunication() 0 13 2
A isConnected() 0 3 1
A processObject() 0 22 1
A setSocketTimeout() 0 7 1
A enableSynchronousTransfer() 0 6 1
A generateSocketConnection() 0 21 2
A postSocketCommunication() 0 21 3
A isItPingTime() 0 12 3
A setConnected() 0 9 2
A preSocketCommunication() 0 9 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT;
6
7
use DateTime;
8
use DateTimeImmutable;
9
use LogicException;
10
use unreal4u\MQTT\Application\EmptyWritableResponse;
11
use unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined;
12
use unreal4u\MQTT\Exceptions\NonMatchingPacketIdentifiers;
13
use unreal4u\MQTT\Exceptions\NotConnected;
14
use unreal4u\MQTT\Exceptions\ServerClosedConnection;
15
use unreal4u\MQTT\Internals\ClientInterface;
16
use unreal4u\MQTT\Internals\ProtocolBase;
17
use unreal4u\MQTT\Internals\ReadableContentInterface;
18
use unreal4u\MQTT\Internals\WritableContentInterface;
19
use unreal4u\MQTT\Protocol\Connect;
20
use unreal4u\MQTT\Protocol\Disconnect;
21
22
use function array_key_exists;
23
use function array_keys;
24
use function floor;
25
use function fread;
26
use function fwrite;
27
use function get_class;
28
use function microtime;
29
use function sprintf;
30
use function stream_get_meta_data;
31
use function stream_set_blocking;
32
use function stream_set_timeout;
33
use function stream_socket_client;
34
use function stream_socket_shutdown;
35
use function strlen;
36
37
/**
38
 * Example working client that implements all methods from the mandatory ClientInterface
39
 * @package unreal4u\MQTT
40
 */
41
final class Client extends ProtocolBase implements ClientInterface
42
{
43
    /**
44
     * Where all the magic happens
45
     * @var Resource|null
46
     */
47
    private $socket;
48
49
    /**
50
     * Fast way to know whether we are connected or not
51
     * @var bool
52
     */
53
    private $isConnected = false;
54
55
    /**
56
     * Fast way to know whether we are currently in locked mode or not
57
     * @var bool
58
     */
59
    private $isCurrentlyLocked = false;
60
61
    /**
62
     * Annotates the last time there was known to be communication with the MQTT server
63
     * @var DateTimeImmutable
64
     */
65
    private $lastCommunication;
66
67
    /**
68
     * Internal holder of connection parameters
69
     * @var Connect\Parameters
70
     */
71
    private $connectionParameters;
72
73
    /**
74
     * Temporary holder for async requests so that they can be handled synchronously
75
     * @var WritableContentInterface[]
76
     */
77
    private $objectStack = [];
78
79
    /**
80
     * @inheritdoc
81
     * @throws LogicException
82
     * @throws NonMatchingPacketIdentifiers
83
     * @throws NotConnected
84
     * @throws NoConnectionParametersDefined
85
     * @throws ServerClosedConnection
86
     */
87
    public function __destruct()
88
    {
89
        if ($this->isConnected() === true) {
90
            $this->logger->info('Currently connected to broker, disconnecting from it');
91
92
            $this->processObject(new Disconnect($this->logger));
93
        }
94
    }
95
96
    /**
97
     * @inheritdoc
98
     */
99
    public function shutdownConnection(): bool
100
    {
101
        return stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
102
    }
103
104
    /**
105
     * @inheritdoc
106
     */
107
    public function readBrokerData(int $bytes): string
108
    {
109
        $this->logger->debug('Reading bytes from socket', [
110
            'numberOfBytes' => $bytes,
111
            'isLocked' => $this->isCurrentlyLocked,
112
        ]);
113
        return (string)fread($this->socket, $bytes);
114
    }
115
116
    /**
117
     * @inheritdoc
118
     */
119
    public function readBrokerHeader(): string
120
    {
121
        $this->logger->debug('Reading header from response');
122
        return $this->readBrokerData(4);
123
    }
124
125
    /**
126
     * @inheritdoc
127
     * @throws ServerClosedConnection
128
     * @throws NotConnected
129
     */
130
    public function sendBrokerData(WritableContentInterface $object): string
131
    {
132
        if ($this->socket === null) {
133
            $this->logger->alert('Not connected before sending data');
134
            throw new NotConnected('Please connect before performing any other request');
135
        }
136
137
        $writableString = $object->createSendableMessage();
138
        $sizeOfString = strlen($writableString);
139
        $writtenBytes = fwrite($this->socket, $writableString, $sizeOfString);
140
        // $this->logger->debug('Sent string', ['binaryString' => str2bin($writableString)]); // Handy for debugging
141
        if ($writtenBytes !== $sizeOfString) {
142
            $this->logger->error('Written bytes do NOT correspond with size of string!', [
143
                'writtenBytes' => $writtenBytes,
144
                'sizeOfString' => $sizeOfString,
145
            ]);
146
147
            throw new ServerClosedConnection('The server may have disconnected the current client');
148
        }
149
150
        $this->logger->debug('Sent data to socket', ['writtenBytes' => $writtenBytes, 'sizeOfString' => $sizeOfString]);
151
        return $this->checkAndReturnAnswer($object);
152
    }
153
154
    /**
155
     * Checks on the writable object whether we should wait for an answer and either wait or return an empty string
156
     *
157
     * @param WritableContentInterface $object
158
     * @return string
159
     */
160
    private function checkAndReturnAnswer(WritableContentInterface $object): string
161
    {
162
        $returnValue = '';
163
        if ($object->shouldExpectAnswer() === true) {
164
            $this->enableSynchronousTransfer(true);
165
            $returnValue = $this->readBrokerHeader();
166
            $this->enableSynchronousTransfer(false);
167
        }
168
169
        return $returnValue;
170
    }
171
172
    /**
173
     * Checks for socket error connections, will throw an exception if any is found
174
     *
175
     * @param int $errorCode
176
     * @param string $errorDescription
177
     * @return Client
178
     * @throws NotConnected
179
     */
180
    private function checkForConnectionErrors(int $errorCode, string $errorDescription): self
181
    {
182
        if ($errorCode !== 0 || $this->socket === null) {
183
            $this->logger->critical('Could not connect to broker', [
184
                'errorCode' => $errorCode,
185
                'errorDescription' => $errorDescription,
186
            ]);
187
188
            throw new NotConnected('Could not connect to broker: ' . $errorDescription, $errorCode);
189
        }
190
191
        return $this;
192
    }
193
194
    /**
195
     * Special handling of the connect part: create the socket
196
     *
197
     * @param Connect $connection
198
     * @return bool
199
     * @throws NotConnected
200
     * @throws NoConnectionParametersDefined
201
     */
202
    private function generateSocketConnection(Connect $connection): bool
203
    {
204
        $this->logger->debug('Creating socket connection');
205
        $this->connectionParameters = $connection->getConnectionParameters();
206
        $socket = stream_socket_client(
207
            $this->connectionParameters->getConnectionUrl(),
208
            $errorCode,
209
            $errorDescription,
210
            60,
211
            STREAM_CLIENT_CONNECT
212
        );
213
        if ($socket !== false) {
214
            $this->socket = $socket;
215
        }
216
217
        $this
218
            ->checkForConnectionErrors($errorCode, $errorDescription)
219
            ->setSocketTimeout();
220
221
        $this->logger->debug('Created socket connection successfully, continuing', stream_get_meta_data($this->socket));
222
        return true;
223
    }
224
225
    /**
226
     * Calculates and sets the timeout on the socket connection according to the MQTT standard
227
     *
228
     * 1.5 times the keep alive period is the maximum amount of time the connection may remain idle before the
229
     * broker decides to close it.
230
     * Odd numbers will also produce a 0.5 second extra time, take this into account as well
231
     *
232
     * @return Client
233
     */
234
    private function setSocketTimeout(): self
235
    {
236
        $timeCalculation = $this->connectionParameters->getKeepAlivePeriod() * 1.5;
237
        $seconds = (int) floor($timeCalculation);
238
        stream_set_timeout($this->socket, $seconds, (int)($timeCalculation - $seconds) * 1000);
239
240
        return $this;
241
    }
242
243
    /**
244
     * @inheritdoc
245
     */
246
    public function enableSynchronousTransfer(bool $newStatus): ClientInterface
247
    {
248
        $this->logger->debug('Setting new blocking status', ['newStatus' => $newStatus]);
249
        stream_set_blocking($this->socket, $newStatus);
250
        $this->isCurrentlyLocked = $newStatus;
251
        return $this;
252
    }
253
254
    /**
255
     * Stuff that has to happen before we actually begin sending data through our socket
256
     *
257
     * @param WritableContentInterface $object
258
     * @return Client
259
     * @throws NotConnected
260
     * @throws NoConnectionParametersDefined
261
     */
262
    private function preSocketCommunication(WritableContentInterface $object): self
263
    {
264
        $this->objectStack[$object::getControlPacketValue()] = $object;
265
266
        if ($object instanceof Connect) {
267
            $this->generateSocketConnection($object);
268
        }
269
270
        return $this;
271
    }
272
273
    /**
274
     * Checks in the object stack whether there is some method that might issue the current ReadableContent
275
     *
276
     * @param ReadableContentInterface $readableContent
277
     * @return WritableContentInterface
278
     * @throws LogicException
279
     */
280
    private function postSocketCommunication(ReadableContentInterface $readableContent): WritableContentInterface
281
    {
282
        $originPacket = null;
283
284
        $originPacketIdentifier = $readableContent->getOriginControlPacket();
285
        if (array_key_exists($originPacketIdentifier, $this->objectStack)) {
286
            $this->logger->debug('Origin packet found, returning it', ['originKey' => $originPacketIdentifier]);
287
            $originPacket = $this->objectStack[$originPacketIdentifier];
288
            unset($this->objectStack[$originPacketIdentifier]);
289
        } elseif ($originPacketIdentifier === 0) {
290
            $originPacket = new EmptyWritableResponse($this->logger);
291
        } else {
292
            $this->logger->warning('No origin packet found!', [
293
                'originKey' => $originPacketIdentifier,
294
                'stack' => array_keys($this->objectStack),
295
            ]);
296
            #throw new \LogicException('No origin instance could be found in the stack, please check');
297
            $originPacket = new EmptyWritableResponse($this->logger);
298
        }
299
300
        return $originPacket;
301
    }
302
303
    /**
304
     * @inheritdoc
305
     * @throws LogicException
306
     * @throws NonMatchingPacketIdentifiers
307
     * @throws ServerClosedConnection
308
     * @throws NoConnectionParametersDefined
309
     * @throws NotConnected
310
     */
311
    public function processObject(WritableContentInterface $object): ReadableContentInterface
312
    {
313
        $currentObject = get_class($object);
314
        $this->logger->debug('Validating object', ['object' => $currentObject]);
315
316
        $this->preSocketCommunication($object);
317
318
        $this->logger->info('About to send data', ['object' => $currentObject]);
319
        $readableContent = $object->expectAnswer($this->sendBrokerData($object), $this);
320
        /*
321
         * Some objects must perform certain actions on the connection, for example:
322
         * - ConnAck must set the connected bit
323
         * - PingResp must reset the internal last-communication datetime
324
         */
325
        $this->logger->debug('Checking stack and performing special operations', [
326
            'originObject' => $currentObject,
327
            'responseObject' => get_class($readableContent),
328
        ]);
329
330
        $readableContent->performSpecialActions($this, $this->postSocketCommunication($readableContent));
331
332
        return $readableContent;
333
    }
334
335
    /**
336
     * @inheritdoc
337
     */
338
    public function isItPingTime(): bool
339
    {
340
        $secondsDifference = (new DateTime('now'))->getTimestamp() - $this->lastCommunication->getTimestamp();
341
        $this->logger->debug('Checking time difference', [
342
            'secondsDifference' => $secondsDifference,
343
            'keepAlivePeriod' => $this->connectionParameters->getKeepAlivePeriod(),
344
        ]);
345
346
        return
347
            $this->isConnected() &&
348
            $this->connectionParameters->getKeepAlivePeriod() > 0 &&
349
            $secondsDifference >= $this->connectionParameters->getKeepAlivePeriod()# &&
350
            #!array_key_exists(PingReq::CONTROL_PACKET_VALUE, $this->objectStack)
351
            ;
352
    }
353
354
    /**
355
     * @inheritdoc
356
     */
357
    public function updateLastCommunication(): ClientInterface
358
    {
359
        $lastCommunication = null;
360
        if ($this->lastCommunication !== null) {
361
            $lastCommunication = $this->lastCommunication->format('Y-m-d H:i:s.u');
362
        }
363
        // "now" does not support microseconds, so create the timestamp with a format that does
364
        $this->lastCommunication = DateTimeImmutable::createFromFormat('U.u', sprintf('%.6F', microtime(true)));
0 ignored issues
show
Documentation Bug introduced by
It seems like DateTimeImmutable::creat....6F', microtime(true))) can also be of type false. However, the property $lastCommunication is declared as type DateTimeImmutable. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
365
        $this->logger->debug('Updating internal last communication timestamp', [
366
            'previousValue' => $lastCommunication,
367
            'currentValue' => $this->lastCommunication->format('Y-m-d H:i:s.u'),
368
        ]);
369
        return $this;
370
    }
371
372
    /**
373
     * @inheritdoc
374
     */
375
    public function setConnected(bool $isConnected): ClientInterface
376
    {
377
        $this->logger->debug('Setting internal connected property', ['connected' => $isConnected]);
378
        $this->isConnected = $isConnected;
379
        if ($this->isConnected() === false) {
380
            $this->socket = null;
381
        }
382
383
        return $this;
384
    }
385
386
    /**
387
     * @inheritdoc
388
     */
389
    public function isConnected(): bool
390
    {
391
        return $this->isConnected;
392
    }
393
}
394