Passed
Push — master ( 578a67...de03ca )
by Camilo
01:57
created

Publish::checkForValidPacketIdentifier()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 8
c 0
b 0
f 0
ccs 5
cts 5
cp 1
rs 10
cc 2
nc 2
nop 0
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use InvalidArgumentException;
8
use OutOfBoundsException;
9
use OutOfRangeException;
10
use unreal4u\MQTT\Application\EmptyReadableResponse;
11
use unreal4u\MQTT\DataTypes\Message;
12
use unreal4u\MQTT\DataTypes\PacketIdentifier;
13
use unreal4u\MQTT\DataTypes\QoSLevel;
14
use unreal4u\MQTT\DataTypes\TopicName;
15
use unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined;
16
use unreal4u\MQTT\Exceptions\InvalidQoSLevel;
17
use unreal4u\MQTT\Exceptions\InvalidRequest;
18
use unreal4u\MQTT\Exceptions\InvalidResponseType;
19
use unreal4u\MQTT\Exceptions\MessageTooBig;
20
use unreal4u\MQTT\Exceptions\MissingTopicName;
21
use unreal4u\MQTT\Exceptions\NotConnected;
22
use unreal4u\MQTT\Internals\ClientInterface;
23
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
24
use unreal4u\MQTT\Internals\ProtocolBase;
25
use unreal4u\MQTT\Internals\ReadableContent;
26
use unreal4u\MQTT\Internals\ReadableContentInterface;
27
use unreal4u\MQTT\Internals\WritableContent;
28
use unreal4u\MQTT\Internals\WritableContentInterface;
29
use unreal4u\MQTT\Utilities;
30
31
use function decbin;
32
use function ord;
33
use function sprintf;
34
use function strlen;
35
use function substr;
36
37
/**
38
 * A PUBLISH Control Packet is sent from a Client to a Server or vice-versa to transport an Application Message.
39
 *
40
 * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037
41
 *
42
 * QoS lvl1:
43
 *   First packet: PUBLISH
44
 *   Second packet: PUBACK
45
 *
46
 * QoS lvl2:
47
 *   First packet: PUBLISH
48
 *   Second packet: PUBREC
49
 *   Third packet: PUBREL
50
 *   Fourth packet: PUBCOMP
51
 *
52
 * @see https://go.gliffy.com/go/publish/12498076
53
 */
54
final class Publish extends ProtocolBase implements ReadableContentInterface, WritableContentInterface
55
{
56 1
    use ReadableContent;
57
    use /** @noinspection TraitsPropertiesConflictsInspection */
58 1
        WritableContent;
59 1
    use PacketIdentifierFunctionality;
60
61
    private const CONTROL_PACKET_VALUE = 3;
62
63
    /**
64
     * Contains the message to be sent
65
     * @var Message
66
     */
67
    private $message;
68
69
    /**
70
     * Flag to check whether a message is a redelivery (DUP flag)
71
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
72
     * @var bool
73
     */
74
    public $isRedelivery = false;
75
76
    /**
77
     * @return string
78
     * @throws InvalidQoSLevel
79
     * @throws MissingTopicName
80
     * @throws OutOfRangeException
81
     * @throws InvalidArgumentException
82
     */
83 5
    public function createVariableHeader(): string
84
    {
85 5
        if ($this->message === null) {
86 1
            throw new InvalidArgumentException('You must at least provide a message object with a topic name');
87
        }
88
89 4
        $variableHeaderContents = $this->createUTF8String($this->message->getTopicName());
90
        // Reset the special flags should the same object be reused with another message
91 4
        $this->specialFlags = 0;
92
93 4
        $variableHeaderContents .= $this->createVariableHeaderFlags();
94
95 4
        return $variableHeaderContents;
96
    }
97
98
    /**
99
     * Sets some common flags and returns the variable header string should there be one
100
     *
101
     * @return string
102
     * @throws OutOfRangeException
103
     * @throws InvalidQoSLevel
104
     */
105 4
    private function createVariableHeaderFlags(): string
106
    {
107 4
        if ($this->isRedelivery) {
108
            // DUP flag: if the message is a re-delivery, mark it as such
109
            $this->specialFlags |= 8;
110
            $this->logger->debug('Activating redelivery bit');
111
        }
112
113 4
        if ($this->message->isRetained()) {
114
            // RETAIN flag: should the server retain the message?
115 1
            $this->specialFlags |= 1;
116 1
            $this->logger->debug('Activating retain flag');
117
        }
118
119
        // Check QoS level and perform the corresponding actions
120 4
        if ($this->message->getQoSLevel() !== 0) {
121
            // 0 for QoS lvl2 for QoS lvl1 and 4 for QoS lvl2
122 3
            $this->specialFlags |= ($this->message->getQoSLevel() * 2);
123 3
            $this->logger->debug(sprintf('Activating QoS level %d bit', $this->message->getQoSLevel()));
124 3
            return $this->getPacketIdentifierBinaryRepresentation();
125
        }
126
127 1
        return '';
128
    }
129
130
    /**
131
     * @return string
132
     * @throws MissingTopicName
133
     * @throws MessageTooBig
134
     * @throws InvalidArgumentException
135
     */
136 2
    public function createPayload(): string
137
    {
138 2
        if ($this->message === null) {
139 1
            throw new InvalidArgumentException('A message must be set before publishing');
140
        }
141 1
        return $this->message->getPayload();
142
    }
143
144
    /**
145
     * QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true
146
     * @return bool
147
     * @throws InvalidQoSLevel
148
     */
149 2
    public function shouldExpectAnswer(): bool
150
    {
151 2
        $shouldExpectAnswer = !($this->message->getQoSLevel() === 0);
152 2
        $this->logger->debug('Checking whether we should expect an answer or not', [
153 2
            'shouldExpectAnswer' => $shouldExpectAnswer,
154
        ]);
155 2
        return $shouldExpectAnswer;
156
    }
157
158
    /**
159
     * @param string $brokerBitStream
160
     * @param ClientInterface $client
161
     * @return ReadableContentInterface
162
     * @throws InvalidResponseType
163
     */
164 3
    public function expectAnswer(string $brokerBitStream, ClientInterface $client): ReadableContentInterface
165
    {
166 3
        switch ($this->message->getQoSLevel()) {
167 3
            case 1:
168 1
                $pubAck = new PubAck($this->logger);
169 1
                $pubAck->instantiateObject($brokerBitStream, $client);
170 1
                return $pubAck;
171 2
            case 2:
172 1
                $pubRec = new PubRec($this->logger);
173 1
                $pubRec->instantiateObject($brokerBitStream, $client);
174 1
                return $pubRec;
175 1
            case 0:
176
            default:
177 1
                return new EmptyReadableResponse($this->logger);
178
        }
179
    }
180
181
    /**
182
     * Sets the to be sent message
183
     *
184
     * @param Message $message
185
     * @return WritableContentInterface
186
     */
187 19
    public function setMessage(Message $message): WritableContentInterface
188
    {
189 19
        $this->message = $message;
190 19
        return $this;
191
    }
192
193
    /**
194
     * Gets the set message
195
     *
196
     * @return Message
197
     */
198 15
    public function getMessage(): Message
199
    {
200 15
        return $this->message;
201
    }
202
203
    /**
204
     * Sets several bits and pieces from the first byte of the fixed header for the Publish packet
205
     *
206
     * @param int $firstByte
207
     * @param QoSLevel $qoSLevel
208
     * @return Publish
209
     * @throws InvalidQoSLevel
210
     */
211 14
    private function analyzeFirstByte(int $firstByte, QoSLevel $qoSLevel): Publish
212
    {
213 14
        $this->logger->debug('Analyzing first byte', [sprintf('%08d', decbin($firstByte))]);
214
        // Retained bit is bit 0 of first byte
215 14
        $this->message->setRetainFlag(false);
216 14
        if (($firstByte & 1) === 1) {
217 4
            $this->logger->debug('Setting retain flag to true');
218 4
            $this->message->setRetainFlag(true);
219
        }
220
        // QoS level is already been taken care of, assign it to the message at this point
221 14
        $this->message->setQoSLevel($qoSLevel);
222
223
        // Duplicate message must be checked only on QoS > 0, else set it to false
224 14
        $this->isRedelivery = false;
225 14
        if (($firstByte & 8) === 8 && $this->message->getQoSLevel() !== 0) {
226
            // Is a duplicate is always bit 3 of first byte
227 2
            $this->isRedelivery = true;
228 2
            $this->logger->debug('Setting redelivery bit');
229
        }
230
231 14
        return $this;
232
    }
233
234
    /**
235
     * Finds out the QoS level in a fixed header for the Publish object
236
     *
237
     * @param int $bitString
238
     * @return QoSLevel
239
     * @throws InvalidQoSLevel
240
     */
241 11
    private function determineIncomingQoSLevel(int $bitString): QoSLevel
242
    {
243
        // QoS lvl are in bit positions 1-2. Shifting is strictly speaking not needed, but increases human comprehension
244 11
        $shiftedBits = $bitString >> 1;
245 11
        $incomingQoSLevel = 0;
246 11
        if (($shiftedBits & 1) === 1) {
247 4
            $incomingQoSLevel = 1;
248
        }
249 11
        if (($shiftedBits & 2) === 2) {
250 2
            $incomingQoSLevel = 2;
251
        }
252
253 11
        $this->logger->debug('Setting QoS level', ['bitString' => $bitString, 'incomingQoSLevel' => $incomingQoSLevel]);
254 11
        return new QoSLevel($incomingQoSLevel);
255
    }
256
257
    /**
258
     * Gets the full message in case this object needs to
259
     *
260
     * @param string $rawMQTTHeaders
261
     * @param ClientInterface $client
262
     * @return string
263
     * @throws OutOfBoundsException
264
     * @throws InvalidArgumentException
265
     * @throws MessageTooBig
266
     * @throws InvalidQoSLevel
267
     */
268 11
    private function completePossibleIncompleteMessage(string $rawMQTTHeaders, ClientInterface $client): string
269
    {
270
        // Read at least one extra byte from the stream if we know that the message is too short
271 11
        if (strlen($rawMQTTHeaders) < 2) {
272 3
            $rawMQTTHeaders .= $client->readBrokerData(1);
273
        }
274
275 11
        $restOfBytes = $this->performRemainingLengthFieldOperations($rawMQTTHeaders, $client);
276
277
        /*
278
         * A complete message consists of:
279
         *  - The very first byte
280
         *  - The size of the remaining length field (from 1 to 4 bytes)
281
         *  - The $restOfBytes
282
         *
283
         * So we have to compare what we already have vs the above calculation
284
         *
285
         * More information:
286
         * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180832
287
         */
288 11
        if (strlen($rawMQTTHeaders) < ($restOfBytes + $this->sizeOfRemainingLengthField + 1)) {
289
            // Read only the portion of data we have left from the socket
290 6
            $readableDataLeft = ($restOfBytes + $this->sizeOfRemainingLengthField + 1) - strlen($rawMQTTHeaders);
291 6
            $rawMQTTHeaders .= $client->readBrokerData($readableDataLeft);
292
        }
293
294 11
        return $rawMQTTHeaders;
295
    }
296
297
    /**
298
     * Will perform sanity checks and fill in the Readable object with data
299
     * @param string $rawMQTTHeaders
300
     * @param ClientInterface $client
301
     * @return ReadableContentInterface
302
     * @throws MessageTooBig
303
     * @throws OutOfBoundsException
304
     * @throws InvalidQoSLevel
305
     * @throws InvalidArgumentException
306
     * @throws OutOfRangeException
307
     */
308 7
    public function fillObject(string $rawMQTTHeaders, ClientInterface $client): ReadableContentInterface
309
    {
310
        // Retrieve full message first
311 7
        $fullMessage = $this->completePossibleIncompleteMessage($rawMQTTHeaders, $client);
312
        // Handy to maintain for debugging purposes
313
        #$this->logger->debug('Bin data', [\unreal4u\MQTT\DebugTools::convertToBinaryRepresentation($rawMQTTHeaders)]);
314
315
        // Handy to have: the first byte
316 7
        $firstByte = ord($fullMessage{0});
317
        // TopicName size is always on the second position after the size of the remaining length field (1 to 4 bytes)
318 7
        $topicSize = ord($fullMessage[$this->sizeOfRemainingLengthField + 2]);
319
        // With the first byte, we can determine the QoS level of the incoming message
320 7
        $qosLevel = $this->determineIncomingQoSLevel($firstByte);
321
322 7
        $messageStartPosition = $this->sizeOfRemainingLengthField + 3;
323
        // If we have a QoS level present, we must retrieve the packet identifier as well
324 7
        if ($qosLevel->getQoSLevel() > 0) {
325 3
            $this->logger->debug('QoS level above 0, shifting message start position and getting packet identifier');
326
            // [2 (fixed header) + 2 (topic size) + $topicSize] marks the beginning of the 2 packet identifier bytes
327 3
            $this->setPacketIdentifier(new PacketIdentifier(Utilities::convertBinaryStringToNumber(
328 3
                $fullMessage[$this->sizeOfRemainingLengthField + 3 + $topicSize] .
329 3
                $fullMessage[$this->sizeOfRemainingLengthField + 4 + $topicSize]
330
            )));
331 3
            $this->logger->debug('Determined packet identifier', ['PI' => $this->getPacketIdentifier()]);
332 3
            $messageStartPosition += 2;
333
        }
334
335
        // At this point $rawMQTTHeaders will be always 1 byte long, initialize a Message object with dummy data for now
336 7
        $this->message = new Message(
337
            // Save to assume a constant here: first 2 bytes will always be fixed header, next 2 bytes are topic size
338 7
            substr($fullMessage, $messageStartPosition + $topicSize),
339 7
            new TopicName(substr($fullMessage, $this->sizeOfRemainingLengthField + 3, $topicSize))
340
        );
341 7
        $this->analyzeFirstByte($firstByte, $qosLevel);
342
343 7
        $this->logger->debug('Determined headers', [
344 7
            'topicSize' => $topicSize,
345 7
            'QoSLevel' => $this->message->getQoSLevel(),
346 7
            'isDuplicate' => $this->isRedelivery,
347 7
            'isRetained' => $this->message->isRetained(),
348
            #'packetIdentifier' => $this->packetIdentifier->getPacketIdentifierValue(), // This is not always set!
349
        ]);
350
351 7
        return $this;
352
    }
353
354
    /**
355
     * @inheritdoc
356
     * @throws InvalidRequest
357
     * @throws InvalidQoSLevel
358
     * @throws NotConnected
359
     * @throws NoConnectionParametersDefined
360
     */
361 5
    public function performSpecialActions(ClientInterface $client, WritableContentInterface $originalRequest): bool
362
    {
363 5
        $qosLevel = $this->message->getQoSLevel();
364 5
        if ($qosLevel === 0) {
365 3
            $this->logger->debug('No response needed', ['qosLevel', $qosLevel]);
366
        } else {
367 2
            if ($qosLevel === 1) {
368 1
                $this->logger->debug('Responding with PubAck', ['qosLevel' => $qosLevel]);
369 1
                $client->processObject($this->composePubAckAnswer());
370 1
            } elseif ($qosLevel === 2) {
371 1
                $this->logger->debug('Responding with PubRec', ['qosLevel' => $qosLevel]);
372 1
                $client->processObject($this->composePubRecAnswer());
373
            }
374
        }
375
376 5
        return true;
377
    }
378
379
    /**
380
     * Composes a PubRec answer with the same packetIdentifier as what we received
381
     *
382
     * @return PubRec
383
     * @throws InvalidRequest
384
     */
385 2
    private function composePubRecAnswer(): PubRec
386
    {
387 2
        $this->checkForValidPacketIdentifier();
388 2
        $pubRec = new PubRec($this->logger);
389 2
        $pubRec->setPacketIdentifier($this->packetIdentifier);
390 2
        return $pubRec;
391
    }
392
393
    /**
394
     * Composes a PubAck answer with the same packetIdentifier as what we received
395
     *
396
     * @return PubAck
397
     * @throws InvalidRequest
398
     */
399 2
    private function composePubAckAnswer(): PubAck
400
    {
401 2
        $this->checkForValidPacketIdentifier();
402 2
        $pubAck = new PubAck($this->logger);
403 2
        $pubAck->setPacketIdentifier($this->packetIdentifier);
404 2
        return $pubAck;
405
    }
406
407
    /**
408
     * Will check whether the current object has a packet identifier set. If not, we are in serious problems!
409
     *
410
     * @return Publish
411
     * @throws InvalidRequest
412
     */
413 5
    private function checkForValidPacketIdentifier(): self
414
    {
415 5
        if ($this->packetIdentifier === null) {
416 1
            $this->logger->critical('No valid packet identifier found at a stage where there MUST be one set');
417 1
            throw new InvalidRequest('You are trying to send a request without a valid packet identifier');
418
        }
419
420 4
        return $this;
421
    }
422
423
    /**
424
     * PUBLISH packet is the exception to the rule: it is not started on base of a packet that gets sent by us
425
     */
426 1
    public function getOriginControlPacket(): int
427
    {
428 1
        return 0;
429
    }
430
}
431