Publish::createVariableHeader()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 13
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 4
Bugs 0 Features 0
Metric Value
eloc 6
c 4
b 0
f 0
dl 0
loc 13
ccs 7
cts 7
cp 1
rs 10
cc 2
nc 2
nop 0
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use InvalidArgumentException;
8
use OutOfBoundsException;
9
use OutOfRangeException;
10
use unreal4u\MQTT\Application\EmptyReadableResponse;
11
use unreal4u\MQTT\DataTypes\Message;
12
use unreal4u\MQTT\DataTypes\PacketIdentifier;
13
use unreal4u\MQTT\DataTypes\QoSLevel;
14
use unreal4u\MQTT\DataTypes\TopicName;
15
use unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined;
16
use unreal4u\MQTT\Exceptions\InvalidQoSLevel;
17
use unreal4u\MQTT\Exceptions\InvalidRequest;
18
use unreal4u\MQTT\Exceptions\InvalidResponseType;
19
use unreal4u\MQTT\Exceptions\MessageTooBig;
20
use unreal4u\MQTT\Exceptions\MissingTopicName;
21
use unreal4u\MQTT\Exceptions\NotConnected;
22
use unreal4u\MQTT\Internals\ClientInterface;
23
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
24
use unreal4u\MQTT\Internals\ProtocolBase;
25
use unreal4u\MQTT\Internals\ReadableContent;
26
use unreal4u\MQTT\Internals\ReadableContentInterface;
27
use unreal4u\MQTT\Internals\WritableContent;
28
use unreal4u\MQTT\Internals\WritableContentInterface;
29
use unreal4u\MQTT\Utilities;
30
31
use function decbin;
32
use function ord;
33
use function sprintf;
34
use function strlen;
35
use function substr;
36
37
/**
38
 * A PUBLISH Control Packet is sent from a Client to a Server or vice-versa to transport an Application Message.
39
 *
40
 * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037
41
 *
42
 * QoS lvl1:
43
 *   First packet: PUBLISH
44
 *   Second packet: PUBACK
45
 *
46
 * QoS lvl2:
47
 *   First packet: PUBLISH
48
 *   Second packet: PUBREC
49
 *   Third packet: PUBREL
50
 *   Fourth packet: PUBCOMP
51
 *
52
 * @see https://go.gliffy.com/go/publish/12498076
53
 */
54
final class Publish extends ProtocolBase implements ReadableContentInterface, WritableContentInterface
55
{
56 1
    use ReadableContent;
57
    use /** @noinspection TraitsPropertiesConflictsInspection */
58 1
        WritableContent;
59 1
    use PacketIdentifierFunctionality;
60
61
    private const CONTROL_PACKET_VALUE = 3;
62
63
    /**
64
     * Contains the message to be sent
65
     * @var Message
66
     */
67
    private $message;
68
69
    /**
70
     * Flag to check whether a message is a redelivery (DUP flag)
71
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
72
     * @var bool
73
     */
74
    public $isRedelivery = false;
75
76
    /**
77
     * @return string
78
     * @throws InvalidQoSLevel
79
     * @throws MissingTopicName
80
     * @throws OutOfRangeException
81
     * @throws InvalidArgumentException
82
     */
83 5
    public function createVariableHeader(): string
84
    {
85 5
        if ($this->message === null) {
86 1
            throw new InvalidArgumentException('You must at least provide a message object with a topic name');
87
        }
88
89 4
        $variableHeaderContents = $this->createUTF8String($this->message->getTopicName());
90
        // Reset the special flags should the same object be reused with another message
91 4
        $this->specialFlags = 0;
92
93 4
        $variableHeaderContents .= $this->createVariableHeaderFlags();
94
95 4
        return $variableHeaderContents;
96
    }
97
98
    /**
99
     * Sets some common flags and returns the variable header string should there be one
100
     *
101
     * @return string
102
     * @throws OutOfRangeException
103
     * @throws InvalidQoSLevel
104
     */
105 4
    private function createVariableHeaderFlags(): string
106
    {
107 4
        if ($this->isRedelivery) {
108
            // DUP flag: if the message is a re-delivery, mark it as such
109
            $this->specialFlags |= 8;
110
            $this->logger->debug('Activating redelivery bit');
111
        }
112
113 4
        if ($this->message->isRetained()) {
114
            // RETAIN flag: should the server retain the message?
115 1
            $this->specialFlags |= 1;
116 1
            $this->logger->debug('Activating retain flag');
117
        }
118
119
        // Check QoS level and perform the corresponding actions
120 4
        if ($this->message->getQoSLevel() !== 0) {
121
            // 0 for QoS lvl2 for QoS lvl1 and 4 for QoS lvl2
122 3
            $this->specialFlags |= ($this->message->getQoSLevel() * 2);
123 3
            $this->logger->debug(sprintf('Activating QoS level %d bit', $this->message->getQoSLevel()), [
124 3
                'PI' => $this->packetIdentifier->getPacketIdentifierValue(),
125
            ]);
126 3
            return $this->getPacketIdentifierBinaryRepresentation();
127
        }
128
129 1
        return '';
130
    }
131
132
    /**
133
     * @return string
134
     * @throws MissingTopicName
135
     * @throws MessageTooBig
136
     * @throws InvalidArgumentException
137
     */
138 2
    public function createPayload(): string
139
    {
140 2
        if ($this->message === null) {
141 1
            throw new InvalidArgumentException('A message must be set before publishing');
142
        }
143 1
        return $this->message->getPayload();
144
    }
145
146
    /**
147
     * QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true
148
     * @return bool
149
     * @throws InvalidQoSLevel
150
     */
151 2
    public function shouldExpectAnswer(): bool
152
    {
153 2
        $shouldExpectAnswer = !($this->message->getQoSLevel() === 0);
154 2
        $this->logger->debug('Checking whether we should expect an answer or not', [
155 2
            'shouldExpectAnswer' => $shouldExpectAnswer,
156
        ]);
157 2
        return $shouldExpectAnswer;
158
    }
159
160
    /**
161
     * @param string $brokerBitStream
162
     * @param ClientInterface $client
163
     * @return ReadableContentInterface
164
     * @throws InvalidResponseType
165
     */
166 3
    public function expectAnswer(string $brokerBitStream, ClientInterface $client): ReadableContentInterface
167
    {
168 3
        switch ($this->message->getQoSLevel()) {
169 3
            case 1:
170 1
                $pubAck = new PubAck($this->logger);
171 1
                $pubAck->instantiateObject($brokerBitStream, $client);
172 1
                return $pubAck;
173 2
            case 2:
174 1
                $pubRec = new PubRec($this->logger);
175 1
                $pubRec->instantiateObject($brokerBitStream, $client);
176 1
                return $pubRec;
177 1
            case 0:
178
            default:
179 1
                return new EmptyReadableResponse($this->logger);
180
        }
181
    }
182
183
    /**
184
     * Sets the to be sent message
185
     *
186
     * @param Message $message
187
     * @return WritableContentInterface
188
     */
189 19
    public function setMessage(Message $message): WritableContentInterface
190
    {
191 19
        $this->message = $message;
192 19
        return $this;
193
    }
194
195
    /**
196
     * Gets the set message
197
     *
198
     * @return Message
199
     */
200 15
    public function getMessage(): Message
201
    {
202 15
        return $this->message;
203
    }
204
205
    /**
206
     * Sets several bits and pieces from the first byte of the fixed header for the Publish packet
207
     *
208
     * @param int $firstByte
209
     * @param QoSLevel $qoSLevel
210
     * @return Publish
211
     * @throws InvalidQoSLevel
212
     */
213 14
    private function analyzeFirstByte(int $firstByte, QoSLevel $qoSLevel): Publish
214
    {
215 14
        $this->logger->debug('Analyzing first byte', [sprintf('%08d', decbin($firstByte))]);
216
        // Retained bit is bit 0 of first byte
217 14
        $this->message->setRetainFlag(false);
218 14
        if (($firstByte & 1) === 1) {
219 4
            $this->logger->debug('Setting retain flag to true');
220 4
            $this->message->setRetainFlag(true);
221
        }
222
        // QoS level is already been taken care of, assign it to the message at this point
223 14
        $this->message->setQoSLevel($qoSLevel);
224
225
        // Duplicate message must be checked only on QoS > 0, else set it to false
226 14
        $this->isRedelivery = false;
227 14
        if (($firstByte & 8) === 8 && $this->message->getQoSLevel() !== 0) {
228
            // Is a duplicate is always bit 3 of first byte
229 2
            $this->isRedelivery = true;
230 2
            $this->logger->debug('Setting redelivery bit');
231
        }
232
233 14
        return $this;
234
    }
235
236
    /**
237
     * Finds out the QoS level in a fixed header for the Publish object
238
     *
239
     * @param int $bitString
240
     * @return QoSLevel
241
     * @throws InvalidQoSLevel
242
     */
243 11
    private function determineIncomingQoSLevel(int $bitString): QoSLevel
244
    {
245
        // QoS lvl are in bit positions 1-2. Shifting is strictly speaking not needed, but increases human comprehension
246 11
        $shiftedBits = $bitString >> 1;
247 11
        $incomingQoSLevel = 0;
248 11
        if (($shiftedBits & 1) === 1) {
249 4
            $incomingQoSLevel = 1;
250
        }
251 11
        if (($shiftedBits & 2) === 2) {
252 2
            $incomingQoSLevel = 2;
253
        }
254
255 11
        $this->logger->debug('Setting QoS level', ['bitString' => $bitString, 'incomingQoSLevel' => $incomingQoSLevel]);
256 11
        return new QoSLevel($incomingQoSLevel);
257
    }
258
259
    /**
260
     * Gets the full message in case this object needs to
261
     *
262
     * @param string $rawMQTTHeaders
263
     * @param ClientInterface $client
264
     * @return string
265
     * @throws OutOfBoundsException
266
     * @throws InvalidArgumentException
267
     * @throws MessageTooBig
268
     * @throws InvalidQoSLevel
269
     */
270 11
    private function completePossibleIncompleteMessage(string $rawMQTTHeaders, ClientInterface $client): string
271
    {
272
        // Read at least one extra byte from the stream if we know that the message is too short
273 11
        if (strlen($rawMQTTHeaders) < 2) {
274 3
            $rawMQTTHeaders .= $client->readBrokerData(1);
275
        }
276
277 11
        $restOfBytes = $this->performRemainingLengthFieldOperations($rawMQTTHeaders, $client);
278
279
        /*
280
         * A complete message consists of:
281
         *  - The very first byte
282
         *  - The size of the remaining length field (from 1 to 4 bytes)
283
         *  - The $restOfBytes
284
         *
285
         * So we have to compare what we already have vs the above calculation
286
         *
287
         * More information:
288
         * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180832
289
         */
290 11
        if (strlen($rawMQTTHeaders) < ($restOfBytes + $this->sizeOfRemainingLengthField + 1)) {
291
            // Read only the portion of data we have left from the socket
292 6
            $readableDataLeft = ($restOfBytes + $this->sizeOfRemainingLengthField + 1) - strlen($rawMQTTHeaders);
293 6
            $rawMQTTHeaders .= $client->readBrokerData($readableDataLeft);
294
        }
295
296 11
        return $rawMQTTHeaders;
297
    }
298
299
    /**
300
     * Will perform sanity checks and fill in the Readable object with data
301
     * @param string $rawMQTTHeaders
302
     * @param ClientInterface $client
303
     * @return ReadableContentInterface
304
     * @throws MessageTooBig
305
     * @throws OutOfBoundsException
306
     * @throws InvalidQoSLevel
307
     * @throws InvalidArgumentException
308
     * @throws OutOfRangeException
309
     */
310 7
    public function fillObject(string $rawMQTTHeaders, ClientInterface $client): ReadableContentInterface
311
    {
312
        // Retrieve full message first
313 7
        $fullMessage = $this->completePossibleIncompleteMessage($rawMQTTHeaders, $client);
314
        // Handy to maintain for debugging purposes
315
        #$this->logger->debug('Bin data', [\unreal4u\MQTT\DebugTools::convertToBinaryRepresentation($rawMQTTHeaders)]);
316
317
        // Handy to have: the first byte
318 7
        $firstByte = ord($fullMessage[0]);
319
        // TopicName size is always on the second position after the size of the remaining length field (1 to 4 bytes)
320 7
        $topicSize = ord($fullMessage[$this->sizeOfRemainingLengthField + 2]);
321
        // With the first byte, we can determine the QoS level of the incoming message
322 7
        $qosLevel = $this->determineIncomingQoSLevel($firstByte);
323
324 7
        $messageStartPosition = $this->sizeOfRemainingLengthField + 3;
325
        // If we have a QoS level present, we must retrieve the packet identifier as well
326 7
        if ($qosLevel->getQoSLevel() > 0) {
327 3
            $this->logger->debug('QoS level above 0, shifting message start position and getting packet identifier');
328
            // [2 (fixed header) + 2 (topic size) + $topicSize] marks the beginning of the 2 packet identifier bytes
329 3
            $this->setPacketIdentifier(new PacketIdentifier(Utilities::convertBinaryStringToNumber(
330 3
                $fullMessage[$this->sizeOfRemainingLengthField + 3 + $topicSize] .
331 3
                $fullMessage[$this->sizeOfRemainingLengthField + 4 + $topicSize]
332
            )));
333 3
            $this->logger->debug('Determined packet identifier', ['PI' => $this->getPacketIdentifier()]);
334 3
            $messageStartPosition += 2;
335
        }
336
337
        // At this point $rawMQTTHeaders will be always 1 byte long, initialize a Message object with dummy data for now
338 7
        $this->message = new Message(
339
            // Save to assume a constant here: first 2 bytes will always be fixed header, next 2 bytes are topic size
340 7
            substr($fullMessage, $messageStartPosition + $topicSize),
341 7
            new TopicName(substr($fullMessage, $this->sizeOfRemainingLengthField + 3, $topicSize))
342
        );
343 7
        $this->analyzeFirstByte($firstByte, $qosLevel);
344
345 7
        $this->logger->debug('Determined headers', [
346 7
            'topicSize' => $topicSize,
347 7
            'QoSLevel' => $this->message->getQoSLevel(),
348 7
            'isDuplicate' => $this->isRedelivery,
349 7
            'isRetained' => $this->message->isRetained(),
350
            #'packetIdentifier' => $this->packetIdentifier->getPacketIdentifierValue(), // This is not always set!
351
        ]);
352
353 7
        return $this;
354
    }
355
356
    /**
357
     * @inheritdoc
358
     * @throws InvalidRequest
359
     * @throws InvalidQoSLevel
360
     * @throws NotConnected
361
     * @throws NoConnectionParametersDefined
362
     */
363 5
    public function performSpecialActions(ClientInterface $client, WritableContentInterface $originalRequest): bool
364
    {
365 5
        $qosLevel = $this->message->getQoSLevel();
366 5
        if ($qosLevel === 0) {
367 3
            $this->logger->debug('No response needed', ['qosLevel', $qosLevel]);
368 2
        } elseif ($qosLevel === 1) {
369 1
            $this->logger->debug('Responding with PubAck', ['qosLevel' => $qosLevel]);
370 1
            $client->processObject($this->composePubAckAnswer());
371 1
        } elseif ($qosLevel === 2) {
372 1
            $this->logger->debug('Responding with PubRec', ['qosLevel' => $qosLevel]);
373 1
            $client->processObject($this->composePubRecAnswer());
374
        }
375
376 5
        return true;
377
    }
378
379
    /**
380
     * Composes a PubRec answer with the same packetIdentifier as what we received
381
     *
382
     * @return PubRec
383
     * @throws InvalidRequest
384
     */
385 2
    private function composePubRecAnswer(): PubRec
386
    {
387 2
        $this->checkForValidPacketIdentifier();
388 2
        $pubRec = new PubRec($this->logger);
389 2
        $pubRec->setPacketIdentifier($this->packetIdentifier);
390 2
        return $pubRec;
391
    }
392
393
    /**
394
     * Composes a PubAck answer with the same packetIdentifier as what we received
395
     *
396
     * @return PubAck
397
     * @throws InvalidRequest
398
     */
399 2
    private function composePubAckAnswer(): PubAck
400
    {
401 2
        $this->checkForValidPacketIdentifier();
402 2
        $pubAck = new PubAck($this->logger);
403 2
        $pubAck->setPacketIdentifier($this->packetIdentifier);
404 2
        return $pubAck;
405
    }
406
407
    /**
408
     * Will check whether the current object has a packet identifier set. If not, we are in serious problems!
409
     *
410
     * @return Publish
411
     * @throws InvalidRequest
412
     */
413 5
    private function checkForValidPacketIdentifier(): self
414
    {
415 5
        if ($this->packetIdentifier === null) {
416 1
            $this->logger->critical('No valid packet identifier found at a stage where there MUST be one set');
417 1
            throw new InvalidRequest('You are trying to send a request without a valid packet identifier');
418
        }
419
420 4
        return $this;
421
    }
422
423
    /**
424
     * PUBLISH packet is the exception to the rule: it is not started on base of a packet that gets sent by us
425
     */
426 1
    public function getOriginControlPacket(): int
427
    {
428 1
        return 0;
429
    }
430
}
431