Passed
Pull Request — master (#14)
by Camilo
01:32
created

Client::initializeObject()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT;
6
7
use DateTime;
8
use DateTimeImmutable;
9
use LogicException;
10
use unreal4u\MQTT\Application\EmptyWritableResponse;
11
use unreal4u\MQTT\Application\PacketIdentifierStack;
12
use unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined;
13
use unreal4u\MQTT\Exceptions\NonMatchingPacketIdentifiers;
14
use unreal4u\MQTT\Exceptions\NotConnected;
15
use unreal4u\MQTT\Exceptions\ServerClosedConnection;
16
use unreal4u\MQTT\Internals\ClientInterface;
17
use unreal4u\MQTT\Internals\ProtocolBase;
18
use unreal4u\MQTT\Internals\ReadableContentInterface;
19
use unreal4u\MQTT\Internals\WritableContentInterface;
20
use unreal4u\MQTT\Protocol\Connect;
21
use unreal4u\MQTT\Protocol\Disconnect;
22
23
use function array_key_exists;
24
use function array_keys;
25
use function floor;
26
use function fread;
27
use function fwrite;
28
use function get_class;
29
use function microtime;
30
use function sprintf;
31
use function stream_get_meta_data;
32
use function stream_set_blocking;
33
use function stream_set_timeout;
34
use function stream_socket_client;
35
use function stream_socket_shutdown;
36
use function strlen;
37
38
/**
39
 * Example working client that implements all methods from the mandatory ClientInterface
40
 * @package unreal4u\MQTT
41
 */
42
final class Client extends ProtocolBase implements ClientInterface
43
{
44
    /**
45
     * Where all the magic happens
46
     * @var Resource|null
47
     */
48
    private $socket;
49
50
    /**
51
     * Fast way to know whether we are connected or not
52
     * @var bool
53
     */
54
    private $isConnected = false;
55
56
    /**
57
     * Fast way to know whether we are currently in locked mode or not
58
     * @var bool
59
     */
60
    private $isCurrentlyLocked = false;
61
62
    /**
63
     * Annotates the last time there was known to be communication with the MQTT server
64
     * @var DateTimeImmutable
65
     */
66
    private $lastCommunication;
67
68
    /**
69
     * Internal holder of connection parameters
70
     * @var Connect\Parameters
71
     */
72
    private $connectionParameters;
73
74
    /**
75
     * Temporary holder for async requests so that they can be handled synchronously
76
     * @var WritableContentInterface[]
77
     */
78
    private $objectStack = [];
79
80
    /**
81
     * @var PacketIdentifierStack
82
     */
83
    private $packetIdentifierStack;
84
85
    protected function initializeObject(): ProtocolBase
86
    {
87
        $this->packetIdentifierStack = new PacketIdentifierStack();
88
        return parent::initializeObject();
89
    }
90
91
    /**
92
     * @inheritdoc
93
     * @throws LogicException
94
     * @throws NonMatchingPacketIdentifiers
95
     * @throws NotConnected
96
     * @throws NoConnectionParametersDefined
97
     * @throws ServerClosedConnection
98
     */
99
    public function __destruct()
100
    {
101
        if ($this->isConnected() === true) {
102
            $this->logger->info('Currently connected to broker, disconnecting from it');
103
104
            $this->processObject(new Disconnect($this->logger));
105
        }
106
    }
107
108
    /**
109
     * @inheritdoc
110
     */
111
    public function shutdownConnection(): bool
112
    {
113
        return stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
114
    }
115
116
    /**
117
     * @inheritdoc
118
     */
119
    public function readBrokerData(int $bytes): string
120
    {
121
        $this->logger->debug('Reading bytes from socket', [
122
            'numberOfBytes' => $bytes,
123
            'isLocked' => $this->isCurrentlyLocked,
124
        ]);
125
        return (string)fread($this->socket, $bytes);
126
    }
127
128
    /**
129
     * @inheritdoc
130
     */
131
    public function readBrokerHeader(): string
132
    {
133
        $this->logger->debug('Reading header from response');
134
        return $this->readBrokerData(4);
135
    }
136
137
    /**
138
     * @inheritdoc
139
     * @throws ServerClosedConnection
140
     * @throws NotConnected
141
     */
142
    public function sendBrokerData(WritableContentInterface $object): string
143
    {
144
        if ($this->socket === null) {
145
            $this->logger->alert('Not connected before sending data');
146
            throw new NotConnected('Please connect before performing any other request');
147
        }
148
149
        $writableString = $object->createSendableMessage();
150
        $sizeOfString = strlen($writableString);
151
        $writtenBytes = fwrite($this->socket, $writableString, $sizeOfString);
152
        // $this->logger->debug('Sent string', ['binaryString' => str2bin($writableString)]); // Handy for debugging
153
        if ($writtenBytes !== $sizeOfString) {
154
            $this->logger->error('Written bytes do NOT correspond with size of string!', [
155
                'writtenBytes' => $writtenBytes,
156
                'sizeOfString' => $sizeOfString,
157
            ]);
158
159
            throw new ServerClosedConnection('The server may have disconnected the current client');
160
        }
161
162
        if ($object->hasActivePacketIdentifier()) {
163
            $this->packetIdentifierStack->add($object);
164
        }
165
166
        $this->logger->debug('Sent data to socket', ['writtenBytes' => $writtenBytes, 'sizeOfString' => $sizeOfString]);
167
        return $this->checkAndReturnAnswer($object);
168
    }
169
170
    /**
171
     * Checks on the writable object whether we should wait for an answer and either wait or return an empty string
172
     *
173
     * @param WritableContentInterface $object
174
     * @return string
175
     */
176
    private function checkAndReturnAnswer(WritableContentInterface $object): string
177
    {
178
        $returnValue = '';
179
        if ($object->shouldExpectAnswer() === true) {
180
            $this->enableSynchronousTransfer(true);
181
            $returnValue = $this->readBrokerHeader();
182
            $this->enableSynchronousTransfer(false);
183
        }
184
185
        return $returnValue;
186
    }
187
188
    /**
189
     * Checks for socket error connections, will throw an exception if any is found
190
     *
191
     * @param int $errorCode
192
     * @param string $errorDescription
193
     * @return Client
194
     * @throws NotConnected
195
     */
196
    private function checkForConnectionErrors(int $errorCode, string $errorDescription): self
197
    {
198
        if ($errorCode !== 0 || $this->socket === null) {
199
            $this->logger->critical('Could not connect to broker', [
200
                'errorCode' => $errorCode,
201
                'errorDescription' => $errorDescription,
202
            ]);
203
204
            throw new NotConnected('Could not connect to broker: ' . $errorDescription, $errorCode);
205
        }
206
207
        return $this;
208
    }
209
210
    /**
211
     * Special handling of the connect part: create the socket
212
     *
213
     * @param Connect $connection
214
     * @return bool
215
     * @throws NotConnected
216
     * @throws NoConnectionParametersDefined
217
     */
218
    private function generateSocketConnection(Connect $connection): bool
219
    {
220
        $this->logger->debug('Creating socket connection');
221
        $this->connectionParameters = $connection->getConnectionParameters();
222
        $socket = stream_socket_client(
223
            $this->connectionParameters->getConnectionUrl(),
224
            $errorCode,
225
            $errorDescription,
226
            60,
227
            STREAM_CLIENT_CONNECT
228
        );
229
        if ($socket !== false) {
230
            $this->socket = $socket;
231
        }
232
233
        $this
234
            ->checkForConnectionErrors($errorCode, $errorDescription)
235
            ->setSocketTimeout();
236
237
        $this->logger->debug('Created socket connection successfully, continuing', stream_get_meta_data($this->socket));
238
        return true;
239
    }
240
241
    /**
242
     * Calculates and sets the timeout on the socket connection according to the MQTT standard
243
     *
244
     * 1.5 times the keep alive period is the maximum amount of time the connection may remain idle before the
245
     * broker decides to close it.
246
     * Odd numbers will also produce a 0.5 second extra time, take this into account as well
247
     *
248
     * @return Client
249
     */
250
    private function setSocketTimeout(): self
251
    {
252
        $timeCalculation = $this->connectionParameters->getKeepAlivePeriod() * 1.5;
253
        $seconds = (int) floor($timeCalculation);
254
        stream_set_timeout($this->socket, $seconds, (int)($timeCalculation - $seconds) * 1000);
255
256
        return $this;
257
    }
258
259
    /**
260
     * @inheritdoc
261
     */
262
    public function enableSynchronousTransfer(bool $newStatus): ClientInterface
263
    {
264
        $this->logger->debug('Setting new blocking status', ['newStatus' => $newStatus]);
265
        stream_set_blocking($this->socket, $newStatus);
266
        $this->isCurrentlyLocked = $newStatus;
267
        return $this;
268
    }
269
270
    /**
271
     * Stuff that has to happen before we actually begin sending data through our socket
272
     *
273
     * @param WritableContentInterface $object
274
     * @return Client
275
     * @throws NotConnected
276
     * @throws NoConnectionParametersDefined
277
     */
278
    private function preSocketCommunication(WritableContentInterface $object): self
279
    {
280
        $this->objectStack[$object::getControlPacketValue()] = $object;
281
282
        if ($object instanceof Connect) {
283
            $this->generateSocketConnection($object);
284
        }
285
286
        return $this;
287
    }
288
289
    /**
290
     * Checks in the object stack whether there is some method that might issue the current ReadableContent
291
     *
292
     * @param ReadableContentInterface $readableContent
293
     * @return WritableContentInterface
294
     * @throws LogicException
295
     */
296
    private function postSocketCommunication(ReadableContentInterface $readableContent): WritableContentInterface
297
    {
298
        $originPacket = null;
299
300
        $originPacketIdentifier = $readableContent->getOriginControlPacket();
301
        if (array_key_exists($originPacketIdentifier, $this->objectStack)) {
302
            $this->logger->debug('Origin packet found, returning it', ['originKey' => $originPacketIdentifier]);
303
            $originPacket = $this->objectStack[$originPacketIdentifier];
304
            unset($this->objectStack[$originPacketIdentifier]);
305
        } elseif ($originPacketIdentifier === 0) {
306
            $originPacket = new EmptyWritableResponse($this->logger);
307
        } else {
308
            $this->logger->warning('No origin packet found!', [
309
                'originKey' => $originPacketIdentifier,
310
                'stack' => array_keys($this->objectStack),
311
            ]);
312
            #throw new \LogicException('No origin instance could be found in the stack, please check');
313
            $originPacket = new EmptyWritableResponse($this->logger);
314
        }
315
316
        return $originPacket;
317
    }
318
319
    /**
320
     * @inheritdoc
321
     * @throws LogicException
322
     * @throws NonMatchingPacketIdentifiers
323
     * @throws ServerClosedConnection
324
     * @throws NoConnectionParametersDefined
325
     * @throws NotConnected
326
     */
327
    public function processObject(WritableContentInterface $object): ReadableContentInterface
328
    {
329
        $currentObject = get_class($object);
330
        $this->logger->debug('Validating object', ['object' => $currentObject]);
331
332
        $this->preSocketCommunication($object);
333
334
        $this->logger->info('About to send data', ['object' => $currentObject]);
335
        $readableContent = $object->expectAnswer($this->sendBrokerData($object), $this);
336
        /*
337
         * Some objects must perform certain actions on the connection, for example:
338
         * - ConnAck must set the connected bit
339
         * - PingResp must reset the internal last-communication datetime
340
         */
341
        $this->logger->debug('Checking stack and performing special operations', [
342
            'originObject' => $currentObject,
343
            'responseObject' => get_class($readableContent),
344
        ]);
345
346
        $readableContent->performSpecialActions($this, $this->postSocketCommunication($readableContent));
347
348
        return $readableContent;
349
    }
350
351
    /**
352
     * @inheritdoc
353
     */
354
    public function isItPingTime(): bool
355
    {
356
        $secondsDifference = (new DateTime('now'))->getTimestamp() - $this->lastCommunication->getTimestamp();
357
        $this->logger->debug('Checking time difference', [
358
            'secondsDifference' => $secondsDifference,
359
            'keepAlivePeriod' => $this->connectionParameters->getKeepAlivePeriod(),
360
        ]);
361
362
        return
363
            $this->isConnected() &&
364
            $this->connectionParameters->getKeepAlivePeriod() > 0 &&
365
            $secondsDifference >= $this->connectionParameters->getKeepAlivePeriod()# &&
366
            #!array_key_exists(PingReq::CONTROL_PACKET_VALUE, $this->objectStack)
367
            ;
368
    }
369
370
    /**
371
     * @inheritdoc
372
     */
373
    public function updateLastCommunication(): ClientInterface
374
    {
375
        $lastCommunication = null;
376
        if ($this->lastCommunication !== null) {
377
            $lastCommunication = $this->lastCommunication->format('Y-m-d H:i:s.u');
378
        }
379
        // "now" does not support microseconds, so create the timestamp with a format that does
380
        $this->lastCommunication = DateTimeImmutable::createFromFormat('U.u', sprintf('%.6F', microtime(true)));
0 ignored issues
show
Documentation Bug introduced by
It seems like DateTimeImmutable::creat....6F', microtime(true))) can also be of type false. However, the property $lastCommunication is declared as type DateTimeImmutable. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
381
        $this->logger->debug('Updating internal last communication timestamp', [
382
            'previousValue' => $lastCommunication,
383
            'currentValue' => $this->lastCommunication->format('Y-m-d H:i:s.u'),
384
        ]);
385
        return $this;
386
    }
387
388
    /**
389
     * @inheritdoc
390
     */
391
    public function setConnected(bool $isConnected): ClientInterface
392
    {
393
        $this->logger->debug('Setting internal connected property', ['connected' => $isConnected]);
394
        $this->isConnected = $isConnected;
395
        if ($this->isConnected() === false) {
396
            $this->socket = null;
397
        }
398
399
        return $this;
400
    }
401
402
    /**
403
     * @inheritdoc
404
     */
405
    public function isConnected(): bool
406
    {
407
        return $this->isConnected;
408
    }
409
}
410