Completed
Push — master ( 569d13...84f4c4 )
by Camilo
17s queued 10s
created

Publish::completePossibleIncompleteMessage()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 27
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 27
ccs 8
cts 8
cp 1
rs 10
c 0
b 0
f 0
cc 3
nc 4
nop 2
crap 3
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use InvalidArgumentException;
8
use OutOfBoundsException;
9
use OutOfRangeException;
10
use unreal4u\MQTT\Application\EmptyReadableResponse;
11
use unreal4u\MQTT\DataTypes\Message;
12
use unreal4u\MQTT\DataTypes\PacketIdentifier;
13
use unreal4u\MQTT\DataTypes\QoSLevel;
14
use unreal4u\MQTT\DataTypes\TopicName;
15
use unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined;
16
use unreal4u\MQTT\Exceptions\InvalidQoSLevel;
17
use unreal4u\MQTT\Exceptions\InvalidRequest;
18
use unreal4u\MQTT\Exceptions\InvalidResponseType;
19
use unreal4u\MQTT\Exceptions\MessageTooBig;
20
use unreal4u\MQTT\Exceptions\MissingTopicName;
21
use unreal4u\MQTT\Exceptions\NotConnected;
22
use unreal4u\MQTT\Internals\ClientInterface;
23
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
24
use unreal4u\MQTT\Internals\ProtocolBase;
25
use unreal4u\MQTT\Internals\ReadableContent;
26
use unreal4u\MQTT\Internals\ReadableContentInterface;
27
use unreal4u\MQTT\Internals\WritableContent;
28
use unreal4u\MQTT\Internals\WritableContentInterface;
29
use unreal4u\MQTT\Utilities;
30
use function ord;
31
use function strlen;
32
33
/**
34
 * A PUBLISH Control Packet is sent from a Client to a Server or vice-versa to transport an Application Message.
35
 *
36
 * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037
37
 *
38
 * QoS lvl1:
39
 *   First packet: PUBLISH
40
 *   Second packet: PUBACK
41
 *
42
 * QoS lvl2:
43
 *   First packet: PUBLISH
44
 *   Second packet: PUBREC
45
 *   Third packet: PUBREL
46
 *   Fourth packet: PUBCOMP
47
 *
48
 * @see https://go.gliffy.com/go/publish/12498076
49
 */
50
final class Publish extends ProtocolBase implements ReadableContentInterface, WritableContentInterface
51
{
52
    use ReadableContent, /** @noinspection TraitsPropertiesConflictsInspection */
53
        WritableContent,
54
        PacketIdentifierFunctionality;
55
56
    const CONTROL_PACKET_VALUE = 3;
57
58
    /**
59
     * Contains the message to be sent
60
     * @var Message
61
     */
62
    private $message;
63
64
    /**
65
     * Flag to check whether a message is a redelivery (DUP flag)
66
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
67
     * @var bool
68
     */
69
    public $isRedelivery = false;
70
71
    /**
72
     * @return string
73
     * @throws InvalidQoSLevel
74
     * @throws MissingTopicName
75
     * @throws OutOfRangeException
76
     * @throws InvalidArgumentException
77
     */
78 5
    public function createVariableHeader(): string
79
    {
80 5
        if ($this->message === null) {
81 1
            throw new InvalidArgumentException('You must at least provide a message object with a topic name');
82
        }
83
84 4
        $variableHeaderContents = $this->createUTF8String($this->message->getTopicName());
85
        // Reset the special flags should the same object be reused with another message
86 4
        $this->specialFlags = 0;
87
88 4
        $variableHeaderContents .= $this->createVariableHeaderFlags();
89
90 4
        return $variableHeaderContents;
91
    }
92
93
    /**
94
     * Sets some common flags and returns the variable header string should there be one
95
     *
96
     * @return string
97
     * @throws OutOfRangeException
98
     * @throws InvalidQoSLevel
99
     */
100 4
    private function createVariableHeaderFlags(): string
101
    {
102 4
        if ($this->isRedelivery) {
103
            // DUP flag: if the message is a re-delivery, mark it as such
104
            $this->specialFlags |= 8;
105
            $this->logger->debug('Activating redelivery bit');
106
        }
107
108 4
        if ($this->message->isRetained()) {
109
            // RETAIN flag: should the server retain the message?
110 1
            $this->specialFlags |= 1;
111 1
            $this->logger->debug('Activating retain flag');
112
        }
113
114
        // Check QoS level and perform the corresponding actions
115 4
        if ($this->message->getQoSLevel() !== 0) {
116
            // 0 for QoS lvl2 for QoS lvl1 and 4 for QoS lvl2
117 3
            $this->specialFlags |= ($this->message->getQoSLevel() * 2);
118 3
            $this->logger->debug(sprintf('Activating QoS level %d bit', $this->message->getQoSLevel()));
119 3
            return $this->getPacketIdentifierBinaryRepresentation();
120
        }
121
122 1
        return '';
123
    }
124
125
    /**
126
     * @return string
127
     * @throws MissingTopicName
128
     * @throws MessageTooBig
129
     * @throws InvalidArgumentException
130
     */
131 2
    public function createPayload(): string
132
    {
133 2
        if ($this->message === null) {
134 1
            throw new InvalidArgumentException('A message must be set before publishing');
135
        }
136 1
        return $this->message->getPayload();
137
    }
138
139
    /**
140
     * QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true
141
     * @return bool
142
     * @throws InvalidQoSLevel
143
     */
144 2
    public function shouldExpectAnswer(): bool
145
    {
146 2
        $shouldExpectAnswer = !($this->message->getQoSLevel() === 0);
147 2
        $this->logger->debug('Checking whether we should expect an answer or not', [
148 2
            'shouldExpectAnswer' => $shouldExpectAnswer,
149
        ]);
150 2
        return $shouldExpectAnswer;
151
    }
152
153
    /**
154
     * @param string $brokerBitStream
155
     * @param ClientInterface $client
156
     * @return ReadableContentInterface
157
     * @throws InvalidResponseType
158
     */
159 3
    public function expectAnswer(string $brokerBitStream, ClientInterface $client): ReadableContentInterface
160
    {
161 3
        switch ($this->message->getQoSLevel()) {
162 3
            case 1:
163 1
                $pubAck = new PubAck($this->logger);
164 1
                $pubAck->instantiateObject($brokerBitStream, $client);
165 1
                return $pubAck;
166 2
            case 2:
167 1
                $pubRec = new PubRec($this->logger);
168 1
                $pubRec->instantiateObject($brokerBitStream, $client);
169 1
                return $pubRec;
170 1
            case 0:
171
            default:
172 1
                return new EmptyReadableResponse($this->logger);
173
        }
174
    }
175
176
    /**
177
     * Sets the to be sent message
178
     *
179
     * @param Message $message
180
     * @return WritableContentInterface
181
     */
182 19
    public function setMessage(Message $message): WritableContentInterface
183
    {
184 19
        $this->message = $message;
185 19
        return $this;
186
    }
187
188
    /**
189
     * Gets the set message
190
     *
191
     * @return Message
192
     */
193 15
    public function getMessage(): Message
194
    {
195 15
        return $this->message;
196
    }
197
198
    /**
199
     * Sets several bits and pieces from the first byte of the fixed header for the Publish packet
200
     *
201
     * @param int $firstByte
202
     * @param QoSLevel $qoSLevel
203
     * @return Publish
204
     * @throws InvalidQoSLevel
205
     */
206 14
    private function analyzeFirstByte(int $firstByte, QoSLevel $qoSLevel): Publish
207
    {
208 14
        $this->logger->debug('Analyzing first byte', [sprintf('%08d', decbin($firstByte))]);
209
        // Retained bit is bit 0 of first byte
210 14
        $this->message->setRetainFlag(false);
211 14
        if (($firstByte & 1) === 1) {
212 4
            $this->logger->debug('Setting retain flag to true');
213 4
            $this->message->setRetainFlag(true);
214
        }
215
        // QoS level is already been taken care of, assign it to the message at this point
216 14
        $this->message->setQoSLevel($qoSLevel);
217
218
        // Duplicate message must be checked only on QoS > 0, else set it to false
219 14
        $this->isRedelivery = false;
220 14
        if (($firstByte & 8) === 8 && $this->message->getQoSLevel() !== 0) {
221
            // Is a duplicate is always bit 3 of first byte
222 2
            $this->isRedelivery = true;
223 2
            $this->logger->debug('Setting redelivery bit');
224
        }
225
226 14
        return $this;
227
    }
228
229
    /**
230
     * Finds out the QoS level in a fixed header for the Publish object
231
     *
232
     * @param int $bitString
233
     * @return QoSLevel
234
     * @throws InvalidQoSLevel
235
     */
236 11
    private function determineIncomingQoSLevel(int $bitString): QoSLevel
237
    {
238
        // QoS lvl are in bit positions 1-2. Shifting is strictly speaking not needed, but increases human comprehension
239 11
        $shiftedBits = $bitString >> 1;
240 11
        $incomingQoSLevel = 0;
241 11
        if (($shiftedBits & 1) === 1) {
242 4
            $incomingQoSLevel = 1;
243
        }
244 11
        if (($shiftedBits & 2) === 2) {
245 2
            $incomingQoSLevel = 2;
246
        }
247
248 11
        $this->logger->debug('Setting QoS level', ['bitString' => $bitString, 'incomingQoSLevel' => $incomingQoSLevel]);
249 11
        return new QoSLevel($incomingQoSLevel);
250
    }
251
252
    /**
253
     * Gets the full message in case this object needs to
254
     *
255
     * @param string $rawMQTTHeaders
256
     * @param ClientInterface $client
257
     * @return string
258
     * @throws OutOfBoundsException
259
     * @throws InvalidArgumentException
260
     * @throws MessageTooBig
261
     * @throws InvalidQoSLevel
262
     */
263 11
    private function completePossibleIncompleteMessage(string $rawMQTTHeaders, ClientInterface $client): string
264
    {
265
        // Read at least one extra byte from the stream if we know that the message is too short
266 11
        if (strlen($rawMQTTHeaders) < 2) {
267 3
            $rawMQTTHeaders .= $client->readBrokerData(1);
268
        }
269
270 11
        $restOfBytes = $this->performRemainingLengthFieldOperations($rawMQTTHeaders, $client);
271
272
        /*
273
         * A complete message consists of:
274
         *  - The very first byte
275
         *  - The size of the remaining length field (from 1 to 4 bytes)
276
         *  - The $restOfBytes
277
         *
278
         * So we have to compare what we already have vs the above calculation
279
         *
280
         * More information:
281
         * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180832
282
         */
283 11
        if (strlen($rawMQTTHeaders) < ($restOfBytes + $this->sizeOfRemainingLengthField + 1)) {
284
            // Read only the portion of data we have left from the socket
285 6
            $readableDataLeft = ($restOfBytes + $this->sizeOfRemainingLengthField + 1) - strlen($rawMQTTHeaders);
286 6
            $rawMQTTHeaders .= $client->readBrokerData($readableDataLeft);
287
        }
288
289 11
        return $rawMQTTHeaders;
290
    }
291
292
    /**
293
     * Will perform sanity checks and fill in the Readable object with data
294
     * @param string $rawMQTTHeaders
295
     * @param ClientInterface $client
296
     * @return ReadableContentInterface
297
     * @throws MessageTooBig
298
     * @throws OutOfBoundsException
299
     * @throws InvalidQoSLevel
300
     * @throws InvalidArgumentException
301
     * @throws OutOfRangeException
302
     */
303 7
    public function fillObject(string $rawMQTTHeaders, ClientInterface $client): ReadableContentInterface
304
    {
305
        // Retrieve full message first
306 7
        $fullMessage = $this->completePossibleIncompleteMessage($rawMQTTHeaders, $client);
307
        // Handy to maintain for debugging purposes
308
        #$this->logger->debug('Bin data', [\unreal4u\MQTT\DebugTools::convertToBinaryRepresentation($rawMQTTHeaders)]);
309
310
        // Handy to have: the first byte
311 7
        $firstByte = ord($fullMessage{0});
312
        // TopicName size is always on the second position after the size of the remaining length field (1 to 4 bytes)
313 7
        $topicSize = ord($fullMessage{$this->sizeOfRemainingLengthField + 2});
314
        // With the first byte, we can determine the QoS level of the incoming message
315 7
        $qosLevel = $this->determineIncomingQoSLevel($firstByte);
316
317 7
        $messageStartPosition = $this->sizeOfRemainingLengthField + 3;
318
        // If we have a QoS level present, we must retrieve the packet identifier as well
319 7
        if ($qosLevel->getQoSLevel() > 0) {
320 3
            $this->logger->debug('QoS level above 0, shifting message start position and getting packet identifier');
321
            // [2 (fixed header) + 2 (topic size) + $topicSize] marks the beginning of the 2 packet identifier bytes
322 3
            $this->setPacketIdentifier(new PacketIdentifier(Utilities::convertBinaryStringToNumber(
323 3
                $fullMessage{$this->sizeOfRemainingLengthField + 3 + $topicSize} .
324 3
                $fullMessage{$this->sizeOfRemainingLengthField + 4 + $topicSize}
325
            )));
326 3
            $this->logger->debug('Determined packet identifier', ['PI' => $this->getPacketIdentifier()]);
327 3
            $messageStartPosition += 2;
328
        }
329
330
        // At this point $rawMQTTHeaders will be always 1 byte long, initialize a Message object with dummy data for now
331 7
        $this->message = new Message(
332
            // Save to assume a constant here: first 2 bytes will always be fixed header, next 2 bytes are topic size
333 7
            substr($fullMessage, $messageStartPosition + $topicSize),
334 7
            new TopicName(substr($fullMessage, $this->sizeOfRemainingLengthField + 3, $topicSize))
335
        );
336 7
        $this->analyzeFirstByte($firstByte, $qosLevel);
337
338 7
        $this->logger->debug('Determined headers', [
339 7
            'topicSize' => $topicSize,
340 7
            'QoSLevel' => $this->message->getQoSLevel(),
341 7
            'isDuplicate' => $this->isRedelivery,
342 7
            'isRetained' => $this->message->isRetained(),
343
            #'packetIdentifier' => $this->packetIdentifier->getPacketIdentifierValue(), // This is not always set!
344
        ]);
345
346 7
        return $this;
347
    }
348
349
    /**
350
     * @inheritdoc
351
     * @throws InvalidRequest
352
     * @throws InvalidQoSLevel
353
     * @throws NotConnected
354
     * @throws NoConnectionParametersDefined
355
     */
356 5
    public function performSpecialActions(ClientInterface $client, WritableContentInterface $originalRequest): bool
357
    {
358 5
        $qosLevel = $this->message->getQoSLevel();
359 5
        if ($qosLevel === 0) {
360 3
            $this->logger->debug('No response needed', ['qosLevel', $qosLevel]);
361
        } else {
362 2
            if ($qosLevel === 1) {
363 1
                $this->logger->debug('Responding with PubAck', ['qosLevel' => $qosLevel]);
364 1
                $client->processObject($this->composePubAckAnswer());
365 1
            } elseif ($qosLevel === 2) {
366 1
                $this->logger->debug('Responding with PubRec', ['qosLevel' => $qosLevel]);
367 1
                $client->processObject($this->composePubRecAnswer());
368
            }
369
        }
370
371 5
        return true;
372
    }
373
374
    /**
375
     * Composes a PubRec answer with the same packetIdentifier as what we received
376
     *
377
     * @return PubRec
378
     * @throws InvalidRequest
379
     */
380 2
    private function composePubRecAnswer(): PubRec
381
    {
382 2
        $this->checkForValidPacketIdentifier();
383 2
        $pubRec = new PubRec($this->logger);
384 2
        $pubRec->setPacketIdentifier($this->packetIdentifier);
385 2
        return $pubRec;
386
    }
387
388
    /**
389
     * Composes a PubAck answer with the same packetIdentifier as what we received
390
     *
391
     * @return PubAck
392
     * @throws InvalidRequest
393
     */
394 2
    private function composePubAckAnswer(): PubAck
395
    {
396 2
        $this->checkForValidPacketIdentifier();
397 2
        $pubAck = new PubAck($this->logger);
398 2
        $pubAck->setPacketIdentifier($this->packetIdentifier);
399 2
        return $pubAck;
400
    }
401
402
    /**
403
     * Will check whether the current object has a packet identifier set. If not, we are in serious problems!
404
     *
405
     * @return Publish
406
     * @throws InvalidRequest
407
     */
408 5
    private function checkForValidPacketIdentifier(): self
409
    {
410 5
        if ($this->packetIdentifier === null) {
411 1
            $this->logger->critical('No valid packet identifier found at a stage where there MUST be one set');
412 1
            throw new InvalidRequest('You are trying to send a request without a valid packet identifier');
413
        }
414
415 4
        return $this;
416
    }
417
418
    /**
419
     * PUBLISH packet is the exception to the rule: it is not started on base of a packet that gets sent by us
420
     */
421 1
    public function getOriginControlPacket(): int
422
    {
423 1
        return 0;
424
    }
425
}
426