Passed
Push — master ( d06a83...d939b7 )
by Camilo
02:23
created

Publish::getMessage()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 1
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\DataTypes\Message;
9
use unreal4u\MQTT\DataTypes\PacketIdentifier;
10
use unreal4u\MQTT\DataTypes\Topic;
11
use unreal4u\MQTT\DataTypes\QoSLevel;
12
use unreal4u\MQTT\Exceptions\InvalidRequest;
13
use unreal4u\MQTT\Internals\ClientInterface;
14
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
15
use unreal4u\MQTT\Internals\ProtocolBase;
16
use unreal4u\MQTT\Internals\ReadableContent;
17
use unreal4u\MQTT\Internals\ReadableContentInterface;
18
use unreal4u\MQTT\Internals\WritableContent;
19
use unreal4u\MQTT\Internals\WritableContentInterface;
20
use unreal4u\MQTT\Utilities;
21
22
/**
23
 * A PUBLISH Control Packet is sent from a Client to a Server or vice-versa to transport an Application Message.
24
 *
25
 * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037
26
 *
27
 * QoS lvl1:
28
 *   First packet: PUBLISH
29
 *   Second packet: PUBACK
30
 *
31
 * QoS lvl2:
32
 *   First packet: PUBLISH
33
 *   Second packet: PUBREC
34
 *   Third packet: PUBREL
35
 *   Fourth packet: PUBCOMP
36
 *
37
 * @see https://go.gliffy.com/go/publish/12498076
38
 */
39
final class Publish extends ProtocolBase implements ReadableContentInterface, WritableContentInterface
40
{
41
    use ReadableContent, WritableContent, PacketIdentifierFunctionality;
42
43
    const CONTROL_PACKET_VALUE = 3;
44
45
    /**
46
     * Contains the message to be sent
47
     * @var Message
48
     */
49
    private $message;
50
51
    /**
52
     * Flag to check whether a message is a redelivery (DUP flag)
53
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
54
     * @var bool
55
     */
56
    public $isRedelivery = false;
57
58
    /**
59
     * @return string
60
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
61
     * @throws \unreal4u\MQTT\Exceptions\MissingTopicName
62
     * @throws \OutOfRangeException
63
     * @throws \InvalidArgumentException
64
     */
65 5
    public function createVariableHeader(): string
66
    {
67 5
        if ($this->message === null) {
68 1
            throw new \InvalidArgumentException('You must at least provide a message object with a topic name');
69
        }
70
71 4
        $variableHeaderContents = $this->createUTF8String($this->message->getTopicName());
72
        // Reset the special flags should the same object be reused with another message
73 4
        $this->specialFlags = 0;
74
75 4
        $variableHeaderContents .= $this->createVariableHeaderFlags();
76 4
        $this->logger->info('Variable header created', ['specialFlags' => $this->specialFlags]);
77
78 4
        return $variableHeaderContents;
79
    }
80
81
    /**
82
     * Sets some common flags and returns the variable header string should there be one
83
     *
84
     * @return string
85
     * @throws \OutOfRangeException
86
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
87
     */
88 4
    private function createVariableHeaderFlags(): string
89
    {
90 4
        if ($this->isRedelivery) {
91
            // DUP flag: if the message is a re-delivery, mark it as such
92
            $this->specialFlags |= 8;
93
            $this->logger->debug('Activating redelivery bit');
94
        }
95
96 4
        if ($this->message->isRetained()) {
97
            // RETAIN flag: should the server retain the message?
98 1
            $this->specialFlags |= 1;
99 1
            $this->logger->debug('Activating retain flag');
100
        }
101
102
        // Check QoS level and perform the corresponding actions
103 4
        if ($this->message->getQoSLevel() !== 0) {
104
            // 0 for QoS lvl2 for QoS lvl1 and 4 for QoS lvl2
105 3
            $this->specialFlags |= ($this->message->getQoSLevel() * 2);
106 3
            $this->logger->debug(sprintf('Activating QoS level %d bit', $this->message->getQoSLevel()));
107 3
            return $this->getPacketIdentifierBinaryRepresentation();
108
        }
109
110 1
        return '';
111
    }
112
113
    /**
114
     * @return string
115
     * @throws \unreal4u\MQTT\Exceptions\MissingTopicName
116
     * @throws \unreal4u\MQTT\Exceptions\MessageTooBig
117
     * @throws \InvalidArgumentException
118
     */
119 2
    public function createPayload(): string
120
    {
121 2
        if ($this->message === null) {
122 1
            throw new \InvalidArgumentException('A message must be set before publishing');
123
        }
124 1
        return $this->message->getPayload();
125
    }
126
127
    /**
128
     * QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true
129
     * @return bool
130
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
131
     */
132 2
    public function shouldExpectAnswer(): bool
133
    {
134 2
        $shouldExpectAnswer = !($this->message->getQoSLevel() === 0);
135 2
        $this->logger->debug('Checking whether we should expect an answer or not', [
136 2
            'shouldExpectAnswer' => $shouldExpectAnswer,
137
        ]);
138 2
        return $shouldExpectAnswer;
139
    }
140
141 3
    public function expectAnswer(string $data, ClientInterface $client): ReadableContentInterface
142
    {
143 3
        switch ($this->message->getQoSLevel()) {
144 3
            case 1:
145 1
                $pubAck = new PubAck($this->logger);
146 1
                $pubAck->instantiateObject($data, $client);
147 1
                return $pubAck;
148 2
            case 2:
149 1
                $pubRec = new PubRec($this->logger);
150 1
                $pubRec->instantiateObject($data, $client);
151 1
                return $pubRec;
152 1
            case 0:
153
            default:
154 1
                return new EmptyReadableResponse($this->logger);
155
        }
156
    }
157
158
    /**
159
     * Sets the to be sent message
160
     *
161
     * @param Message $message
162
     * @return WritableContentInterface
163
     */
164 19
    public function setMessage(Message $message): WritableContentInterface
165
    {
166 19
        $this->message = $message;
167 19
        return $this;
168
    }
169
170
    /**
171
     * Gets the set message
172
     *
173
     * @return Message
174
     */
175 12
    public function getMessage(): Message
176
    {
177 12
        return $this->message;
178
    }
179
180
    /**
181
     * Sets several bits and pieces from the first byte of the fixed header for the Publish packet
182
     *
183
     * @param int $firstByte
184
     * @param QoSLevel $qoSLevel
185
     * @return Publish
186
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
187
     */
188 11
    private function analyzeFirstByte(int $firstByte, QoSLevel $qoSLevel): Publish
189
    {
190 11
        $this->logger->debug('Analyzing first byte', [sprintf('%08d', decbin($firstByte))]);
191
        // Retained bit is bit 0 of first byte
192 11
        $this->message->setRetainFlag(false);
193 11
        if (($firstByte & 1) === 1) {
194 3
            $this->logger->debug('Setting retain flag to true');
195 3
            $this->message->setRetainFlag(true);
196
        }
197
        // QoS level is already been taken care of, assign it to the message at this point
198 11
        $this->message->setQoSLevel($qoSLevel);
199
200
        // Duplicate message must be checked only on QoS > 0, else set it to false
201 11
        $this->isRedelivery = false;
202 11
        if (($firstByte & 8) === 8 && $this->message->getQoSLevel() !== 0) {
203
            // Is a duplicate is always bit 3 of first byte
204 2
            $this->isRedelivery = true;
205 2
            $this->logger->debug('Setting redelivery bit');
206
        }
207
208 11
        return $this;
209
    }
210
211
    /**
212
     * Finds out the QoS level in a fixed header for the Publish object
213
     *
214
     * @param int $bitString
215
     * @return QoSLevel
216
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
217
     */
218 8
    private function determineIncomingQoSLevel(int $bitString): QoSLevel
219
    {
220
        // QoS lvl are in bit positions 1-2. Shifting is strictly speaking not needed, but increases human comprehension
221 8
        $shiftedBits = $bitString >> 1;
222 8
        $incomingQoSLevel = 0;
223 8
        if (($shiftedBits & 1) === 1) {
224 4
            $incomingQoSLevel = 1;
225
        }
226 8
        if (($shiftedBits & 2) === 2) {
227 2
            $incomingQoSLevel = 2;
228
        }
229
230 8
        $this->logger->debug('Setting QoS level', ['bitString' => $bitString, 'incomingQoSLevel' => $incomingQoSLevel]);
231 8
        return new QoSLevel($incomingQoSLevel);
232
    }
233
234
    /**
235
     * Gets the full message in case this object needs to
236
     *
237
     * @param string $rawMQTTHeaders
238
     * @param ClientInterface $client
239
     * @return string
240
     * @throws \OutOfBoundsException
241
     * @throws \InvalidArgumentException
242
     * @throws \unreal4u\MQTT\Exceptions\MessageTooBig
243
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
244
     */
245 8
    private function completePossibleIncompleteMessage(string $rawMQTTHeaders, ClientInterface $client): string
246
    {
247 8
        if (\strlen($rawMQTTHeaders) === 1) {
248 1
            $this->logger->debug('Only one incoming byte, retrieving rest of size and the full payload');
249 1
            $restOfBytes = $client->readBrokerData(1);
250 1
            $payload = $client->readBrokerData(\ord($restOfBytes));
251
        } else {
252 7
            $this->logger->debug('More than 1 byte detected, calculating and retrieving the rest');
253 7
            $restOfBytes = $rawMQTTHeaders{1};
254 7
            $payload = substr($rawMQTTHeaders, 2);
255 7
            $exactRest = \ord($restOfBytes) - \strlen($payload);
256 7
            $payload .= $client->readBrokerData($exactRest);
257 7
            $rawMQTTHeaders = $rawMQTTHeaders{0};
258
        }
259
260
        // $rawMQTTHeaders may be redefined
261 8
        return $rawMQTTHeaders . $restOfBytes . $payload;
262
    }
263
264
    /**
265
     * Will perform sanity checks and fill in the Readable object with data
266
     * @param string $rawMQTTHeaders
267
     * @param ClientInterface $client
268
     * @return ReadableContentInterface
269
     * @throws \unreal4u\MQTT\Exceptions\MessageTooBig
270
     * @throws \OutOfBoundsException
271
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
272
     * @throws \InvalidArgumentException
273
     * @throws \OutOfRangeException
274
     */
275 4
    public function fillObject(string $rawMQTTHeaders, ClientInterface $client): ReadableContentInterface
276
    {
277 4
        $rawMQTTHeaders = $this->completePossibleIncompleteMessage($rawMQTTHeaders, $client);
278
        // Handy to maintain for debugging purposes
279
        #$this->logger->debug('Headers in binary form', [DebugTools::convertToBinaryRepresentation($rawMQTTHeaders)]);
280
281
        // Topic size is always the 3rd byte
282 4
        $firstByte = \ord($rawMQTTHeaders{0});
283 4
        $topicSize = \ord($rawMQTTHeaders{3});
284 4
        $qosLevel = $this->determineIncomingQoSLevel($firstByte);
285
286 4
        $messageStartPosition = 4;
287 4
        if ($qosLevel->getQoSLevel() > 0) {
288 3
            $this->logger->debug('QoS level above 0, shifting message start position and getting packet identifier');
289
            // [2 (fixed header) + 2 (topic size) + $topicSize] marks the beginning of the 2 packet identifier bytes
290 3
            $this->setPacketIdentifier(new PacketIdentifier(Utilities::convertBinaryStringToNumber(
291 3
                $rawMQTTHeaders{4 + $topicSize} . $rawMQTTHeaders{5 + $topicSize}
292
            )));
293 3
            $this->logger->debug('Determined packet identifier', ['PI' => $this->getPacketIdentifier()]);
294 3
            $messageStartPosition += 2;
295
        }
296
297
        // At this point $rawMQTTHeaders will be always 1 byte long, initialize a Message object with dummy data for now
298 4
        $this->message = new Message(
299
            // Save to assume a constant here: first 2 bytes will always be fixed header, next 2 bytes are topic size
300 4
            substr($rawMQTTHeaders, $messageStartPosition + $topicSize),
301 4
            new Topic(substr($rawMQTTHeaders, 4, $topicSize))
302
        );
303 4
        $this->analyzeFirstByte($firstByte, $qosLevel);
304
305 4
        $this->logger->debug('Determined headers', [
306 4
            'topicSize' => $topicSize,
307 4
            'QoSLevel' => $this->message->getQoSLevel(),
308 4
            'isDuplicate' => $this->isRedelivery,
309 4
            'isRetained' => $this->message->isRetained(),
310
            #'packetIdentifier' => $this->packetIdentifier->getPacketIdentifierValue(), // This is not always set!
311
        ]);
312
313
314 4
        return $this;
315
    }
316
317
    /**
318
     * @inheritdoc
319
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
320
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
321
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
322
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
323
     */
324 3
    public function performSpecialActions(ClientInterface $client, WritableContentInterface $originalRequest): bool
325
    {
326 3
        $qosLevel = $this->message->getQoSLevel();
327 3
        if ($qosLevel === 0) {
328 1
            $this->logger->debug('No response needed', ['qosLevel', $qosLevel]);
329
        } else {
330 2
            if ($qosLevel === 1) {
331 1
                $this->logger->debug('Responding with PubAck', ['qosLevel' => $qosLevel]);
332 1
                $client->processObject($this->composePubAckAnswer());
333 1
            } elseif ($qosLevel === 2) {
334 1
                $this->logger->debug('Responding with PubRec', ['qosLevel' => $qosLevel]);
335 1
                $client->processObject($this->composePubRecAnswer());
336
            }
337
        }
338
339 3
        return true;
340
    }
341
342
    /**
343
     * Composes a PubRec answer with the same packetIdentifier as what we received
344
     *
345
     * @return PubRec
346
     * @throws \unreal4u\MQTT\Exceptions\InvalidRequest
347
     */
348 2
    private function composePubRecAnswer(): PubRec
349
    {
350 2
        $this->checkForValidPacketIdentifier();
351 2
        $pubRec = new PubRec($this->logger);
352 2
        $pubRec->setPacketIdentifier($this->packetIdentifier);
353 2
        return $pubRec;
354
    }
355
356
    /**
357
     * Composes a PubAck answer with the same packetIdentifier as what we received
358
     *
359
     * @return PubAck
360
     * @throws \unreal4u\MQTT\Exceptions\InvalidRequest
361
     */
362 2
    private function composePubAckAnswer(): PubAck
363
    {
364 2
        $this->checkForValidPacketIdentifier();
365 2
        $pubAck = new PubAck($this->logger);
366 2
        $pubAck->setPacketIdentifier($this->packetIdentifier);
367 2
        return $pubAck;
368
    }
369
370
    /**
371
     * Will check whether the current object has a packet identifier set. If not, we are in serious problems!
372
     *
373
     * @return Publish
374
     * @throws \unreal4u\MQTT\Exceptions\InvalidRequest
375
     */
376 5
    private function checkForValidPacketIdentifier(): self
377
    {
378 5
        if ($this->packetIdentifier === null) {
379 1
            $this->logger->critical('No valid packet identifier found at a stage where there MUST be one set');
380 1
            throw new InvalidRequest('You are trying to send a request without a valid packet identifier');
381
        }
382
383 4
        return $this;
384
    }
385
386
    /**
387
     * PUBLISH packet is the exception to the rule: it is not started on base of a packet that gets sent by us
388
     */
389 1
    public function getOriginControlPacket(): int
390
    {
391 1
        return 0;
392
    }
393
}
394