Passed
Push — master ( 093280...d8d21f )
by Camilo
02:24
created

Publish::composePubAckAnswer()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 5
ccs 0
cts 4
cp 0
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 0
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\DataTypes\Message;
9
use unreal4u\MQTT\DataTypes\PacketIdentifier;
10
use unreal4u\MQTT\DataTypes\Topic;
11
use unreal4u\MQTT\DataTypes\QoSLevel;
12
use unreal4u\MQTT\Internals\ClientInterface;
13
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
14
use unreal4u\MQTT\Internals\ProtocolBase;
15
use unreal4u\MQTT\Internals\ReadableContent;
16
use unreal4u\MQTT\Internals\ReadableContentInterface;
17
use unreal4u\MQTT\Internals\WritableContent;
18
use unreal4u\MQTT\Internals\WritableContentInterface;
19
use unreal4u\MQTT\Utilities;
20
21
/**
22
 * A PUBLISH Control Packet is sent from a Client to a Server or vice-versa to transport an Application Message.
23
 *
24
 * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037
25
 */
26
final class Publish extends ProtocolBase implements ReadableContentInterface, WritableContentInterface
27
{
28
    use ReadableContent, WritableContent, PacketIdentifierFunctionality;
29
30
    const CONTROL_PACKET_VALUE = 3;
31
32
    /**
33
     * Contains the message to be sent
34
     * @var Message
35
     */
36
    private $message;
37
38
    /**
39
     * Flag to check whether a message is a redelivery (DUP flag)
40
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
41
     * @var bool
42
     */
43
    public $isRedelivery = false;
44
45
    /**
46
     * @return string
47
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
48
     * @throws \unreal4u\MQTT\Exceptions\MissingTopicName
49
     * @throws \OutOfRangeException
50
     * @throws \InvalidArgumentException
51
     */
52 5
    public function createVariableHeader(): string
53
    {
54 5
        if ($this->message === null) {
55 1
            throw new \InvalidArgumentException('You must at least provide a message object with a topic name');
56
        }
57
58 4
        $bitString = $this->createUTF8String($this->message->getTopicName());
59
        // Reset the special flags should the object be reused with another message
60 4
        $this->specialFlags = 0;
61
62 4
        if ($this->isRedelivery) {
63
            $this->logger->debug('Activating redelivery bit');
64
            // DUP flag: if the message is a re-delivery, mark it as such
65
            $this->specialFlags |= 8;
66
        }
67
68
        // Check QoS level and perform the corresponding actions
69 4
        if ($this->message->getQoSLevel() !== 0) {
70
            // 0 for QoS lvl2 for QoS lvl1 and 4 for QoS lvl2
71 3
            $this->specialFlags |= ($this->message->getQoSLevel() * 2);
72 3
            $bitString .= $this->getPacketIdentifierBinaryRepresentation();
73 3
            $this->logger->debug(sprintf('Activating QoS level %d bit', $this->message->getQoSLevel()), [
74 3
                'specialFlags' => $this->specialFlags,
75
            ]);
76
        }
77
78 4
        if ($this->message->isRetained()) {
79
            // RETAIN flag: should the server retain the message?
80 1
            $this->specialFlags |= 1;
81 1
            $this->logger->debug('Activating retain flag', ['specialFlags' => $this->specialFlags]);
82
        }
83
84 4
        $this->logger->info('Variable header created', ['specialFlags' => $this->specialFlags]);
85
86 4
        return $bitString;
87
    }
88
89
    /**
90
     * @return string
91
     * @throws \unreal4u\MQTT\Exceptions\MissingTopicName
92
     * @throws \unreal4u\MQTT\Exceptions\MessageTooBig
93
     * @throws \InvalidArgumentException
94
     */
95 2
    public function createPayload(): string
96
    {
97 2
        if ($this->message === null) {
98 1
            throw new \InvalidArgumentException('A message must be set before publishing');
99
        }
100 1
        return $this->message->getPayload();
101
    }
102
103
    /**
104
     * QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true
105
     * @return bool
106
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
107
     */
108 2
    public function shouldExpectAnswer(): bool
109
    {
110 2
        $shouldExpectAnswer = !($this->message->getQoSLevel() === 0);
111 2
        $this->logger->debug('Checking whether we should expect an answer or not', [
112 2
            'shouldExpectAnswer' => $shouldExpectAnswer,
113
        ]);
114 2
        return $shouldExpectAnswer;
115
    }
116
117 3
    public function expectAnswer(string $data, ClientInterface $client): ReadableContentInterface
118
    {
119 3
        switch ($this->message->getQoSLevel()) {
120 3
            case 1:
121 1
                $pubAck = new PubAck($this->logger);
122 1
                $pubAck->instantiateObject($data, $client);
123 1
                return $pubAck;
124 2
            case 2:
125 1
                $pubRec = new PubRec($this->logger);
126 1
                $pubRec->instantiateObject($data, $client);
127 1
                return $pubRec;
128 1
            case 0:
129
            default:
130 1
                return new EmptyReadableResponse($this->logger);
131
        }
132
    }
133
134
    /**
135
     * Sets the to be sent message
136
     *
137
     * @param Message $message
138
     * @return WritableContentInterface
139
     */
140 16
    public function setMessage(Message $message): WritableContentInterface
141
    {
142 16
        $this->message = $message;
143 16
        return $this;
144
    }
145
146
    /**
147
     * Gets the set message
148
     *
149
     * @return Message
150
     */
151 8
    public function getMessage(): Message
152
    {
153 8
        return $this->message;
154
    }
155
156
    /**
157
     * Sets several bits and pieces from the first byte of the fixed header for the Publish packet
158
     *
159
     * @param int $firstByte
160
     * @param QoSLevel $qoSLevel
161
     * @return Publish
162
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
163
     */
164 7
    private function analyzeFirstByte(int $firstByte, QoSLevel $qoSLevel): Publish
165
    {
166 7
        $this->logger->debug('Analyzing first byte', [sprintf('%08d', decbin($firstByte))]);
167
        // Retained bit is bit 0 of first byte
168 7
        $this->message->setRetainFlag(false);
169 7
        if (($firstByte & 1) === 1) {
170 3
            $this->logger->debug('Setting retain flag to true');
171 3
            $this->message->setRetainFlag(true);
172
        }
173
        // QoS level is already been taken care of, assign it to the message at this point
174 7
        $this->message->setQoSLevel($qoSLevel);
175
176
        // Duplicate message must be checked only on QoS > 0, else set it to false
177 7
        $this->isRedelivery = false;
178 7
        if (($firstByte & 8) === 8 && $this->message->getQoSLevel() !== 0) {
179
            // Is a duplicate is always bit 3 of first byte
180 2
            $this->isRedelivery = true;
181 2
            $this->logger->debug('Setting redelivery bit');
182
        }
183
184 7
        return $this;
185
    }
186
187
    /**
188
     * Finds out the QoS level in a fixed header for the Publish object
189
     *
190
     * @param int $bitString
191
     * @return QoSLevel
192
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
193
     */
194 4
    private function determineIncomingQoSLevel(int $bitString): QoSLevel
195
    {
196
        // QoS lvl are in bit positions 1-2. Shifting is strictly speaking not needed, but increases human comprehension
197 4
        $shiftedBits = $bitString >> 1;
198 4
        $incomingQoSLevel = 0;
199 4
        if (($shiftedBits & 1) === 1) {
200 2
            $incomingQoSLevel = 1;
201
        }
202 4
        if (($shiftedBits & 2) === 2) {
203 1
            $incomingQoSLevel = 2;
204
        }
205
206 4
        $this->logger->debug('Setting QoS level', ['bitString' => $bitString, 'incomingQoSLevel' => $incomingQoSLevel]);
207 4
        return new QoSLevel($incomingQoSLevel);
208
    }
209
210
    /**
211
     * Gets the full message in case this object needs to
212
     *
213
     * @param string $rawMQTTHeaders
214
     * @param ClientInterface $client
215
     * @return string
216
     * @throws \OutOfBoundsException
217
     * @throws \InvalidArgumentException
218
     * @throws \unreal4u\MQTT\Exceptions\MessageTooBig
219
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
220
     */
221
    private function completePossibleIncompleteMessage(string $rawMQTTHeaders, ClientInterface $client): string
222
    {
223
        if (\strlen($rawMQTTHeaders) === 1) {
224
            $this->logger->debug('Only one incoming byte, retrieving rest of size and the full payload');
225
            $restOfBytes = $client->readBrokerData(1);
226
            $payload = $client->readBrokerData(\ord($restOfBytes));
227
        } else {
228
            $this->logger->debug('More than 1 byte detected, calculating and retrieving the rest');
229
            $restOfBytes = $rawMQTTHeaders{1};
230
            $payload = substr($rawMQTTHeaders, 2);
231
            $exactRest = \ord($restOfBytes) - \strlen($payload);
232
            $payload .= $client->readBrokerData($exactRest);
233
            $rawMQTTHeaders = $rawMQTTHeaders{0};
234
        }
235
236
        // $rawMQTTHeaders may be redefined
237
        return $rawMQTTHeaders . $restOfBytes . $payload;
238
    }
239
240
    /**
241
     * Will perform sanity checks and fill in the Readable object with data
242
     * @param string $rawMQTTHeaders
243
     * @param ClientInterface $client
244
     * @return ReadableContentInterface
245
     * @throws \unreal4u\MQTT\Exceptions\MessageTooBig
246
     * @throws \OutOfBoundsException
247
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
248
     * @throws \InvalidArgumentException
249
     * @throws \OutOfRangeException
250
     */
251
    public function fillObject(string $rawMQTTHeaders, ClientInterface $client): ReadableContentInterface
252
    {
253
        $rawMQTTHeaders = $this->completePossibleIncompleteMessage($rawMQTTHeaders, $client);
254
        // Handy to maintain for debugging purposes
255
        #$this->logger->debug('Headers in binary form', [DebugTools::convertToBinaryRepresentation($rawMQTTHeaders)]);
256
257
        // Topic size is always the 3rd byte
258
        $firstByte = \ord($rawMQTTHeaders{0});
259
        $topicSize = \ord($rawMQTTHeaders{3});
260
        $qosLevel = $this->determineIncomingQoSLevel($firstByte);
261
262
        $messageStartPosition = 4;
263
        if ($qosLevel->getQoSLevel() > 0) {
264
            $this->logger->debug('QoS level above 0, shifting message start position and getting packet identifier');
265
            // [2 (fixed header) + 2 (topic size) + $topicSize] marks the beginning of the 2 packet identifier bytes
266
            $this->setPacketIdentifier(new PacketIdentifier(Utilities::convertBinaryStringToNumber(
267
                $rawMQTTHeaders{4 + $topicSize} . $rawMQTTHeaders{5 + $topicSize}
268
            )));
269
            $this->logger->debug('Determined packet identifier', [
270
                'PI' => $this->getPacketIdentifier(),
271
                'firstBit' => \ord($rawMQTTHeaders{4 + $topicSize}),
272
                'secondBit' => \ord($rawMQTTHeaders{5 + $topicSize})
273
            ]);
274
            $messageStartPosition += 2;
275
        }
276
277
        // At this point $rawMQTTHeaders will be always 1 byte long, initialize a Message object with dummy data for now
278
        $this->message = new Message(
279
            // Save to assume a constant here: first 2 bytes will always be fixed header, next 2 bytes are topic size
280
            substr($rawMQTTHeaders, $messageStartPosition + $topicSize),
281
            new Topic(substr($rawMQTTHeaders, 4, $topicSize))
282
        );
283
        $this->analyzeFirstByte(\ord($rawMQTTHeaders{0}), $qosLevel);
284
285
        $this->logger->debug('Determined headers', [
286
            'topicSize' => $topicSize,
287
            'QoSLevel' => $this->message->getQoSLevel(),
288
            'isDuplicate' => $this->isRedelivery,
289
            'isRetained' => $this->message->isRetained(),
290
            #'packetIdentifier' => $this->packetIdentifier->getPacketIdentifierValue(), // This is not always set!
291
        ]);
292
293
294
        return $this;
295
    }
296
297
    /**
298
     * @inheritdoc
299
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
300
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
301
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
302
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
303
     */
304
    public function performSpecialActions(ClientInterface $client, WritableContentInterface $originalRequest): bool
305
    {
306
        $qosLevel = $this->message->getQoSLevel();
307
        if ($qosLevel === 0) {
308
            $this->logger->debug('No response needed', ['qosLevel', $qosLevel]);
309
        } else {
310
            if ($qosLevel === 1) {
311
                $this->logger->debug('Responding with PubAck', ['qosLevel' => $qosLevel]);
312
                $client->processObject($this->composePubAckAnswer());
313
            } elseif ($qosLevel === 2) {
314
                $this->logger->debug('Responding with PubRec', ['qosLevel' => $qosLevel]);
315
                $client->processObject($this->composePubRecAnswer());
316
            }
317
        }
318
319
        return true;
320
    }
321
322
    private function composePubRecAnswer(): PubRec
323
    {
324
        $pubRec = new PubRec($this->logger);
325
        $pubRec->setPacketIdentifier($this->packetIdentifier);
326
        return $pubRec;
327
    }
328
329
    /**
330
     * Composes a PubAck answer with the same packetIdentifier as what we received
331
     * @return PubAck
332
     */
333
    private function composePubAckAnswer(): PubAck
334
    {
335
        $pubAck = new PubAck($this->logger);
336
        $pubAck->setPacketIdentifier($this->packetIdentifier);
337
        return $pubAck;
338
    }
339
340
    /**
341
     * PUBLISH packet is the exception to the rule: it is not started on base of a packet that gets sent by us
342
     */
343
    public function getOriginControlPacket(): int
344
    {
345
        return 0;
346
    }
347
}
348