Passed
Push — master ( f127af...513769 )
by Camilo
02:40
created

Publish::setMessage()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\DataTypes\Message;
9
use unreal4u\MQTT\DataTypes\PacketIdentifier;
10
use unreal4u\MQTT\DataTypes\Topic;
11
use unreal4u\MQTT\DataTypes\QoSLevel;
12
use unreal4u\MQTT\Exceptions\InvalidRequest;
13
use unreal4u\MQTT\Internals\ClientInterface;
14
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
15
use unreal4u\MQTT\Internals\ProtocolBase;
16
use unreal4u\MQTT\Internals\ReadableContent;
17
use unreal4u\MQTT\Internals\ReadableContentInterface;
18
use unreal4u\MQTT\Internals\WritableContent;
19
use unreal4u\MQTT\Internals\WritableContentInterface;
20
use unreal4u\MQTT\Utilities;
21
22
/**
23
 * A PUBLISH Control Packet is sent from a Client to a Server or vice-versa to transport an Application Message.
24
 *
25
 * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037
26
 *
27
 * QoS lvl1:
28
 *   First packet: PUBLISH
29
 *   Second packet: PUBACK
30
 *
31
 * QoS lvl2:
32
 *   First packet: PUBLISH
33
 *   Second packet: PUBREC
34
 *   Third packet: PUBREL
35
 *   Fourth packet: PUBCOMP
36
 *
37
 * @see https://go.gliffy.com/go/publish/12498076
38
 */
39
final class Publish extends ProtocolBase implements ReadableContentInterface, WritableContentInterface
40
{
41
    use ReadableContent, WritableContent, PacketIdentifierFunctionality;
42
43
    const CONTROL_PACKET_VALUE = 3;
44
45
    /**
46
     * Contains the message to be sent
47
     * @var Message
48
     */
49
    private $message;
50
51
    /**
52
     * Flag to check whether a message is a redelivery (DUP flag)
53
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
54
     * @var bool
55
     */
56
    public $isRedelivery = false;
57
58
    /**
59
     * @return string
60
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
61
     * @throws \unreal4u\MQTT\Exceptions\MissingTopicName
62
     * @throws \OutOfRangeException
63
     * @throws \InvalidArgumentException
64
     */
65 5
    public function createVariableHeader(): string
66
    {
67 5
        if ($this->message === null) {
68 1
            throw new \InvalidArgumentException('You must at least provide a message object with a topic name');
69
        }
70
71 4
        $bitString = $this->createUTF8String($this->message->getTopicName());
72
        // Reset the special flags should the object be reused with another message
73 4
        $this->specialFlags = 0;
74
75 4
        if ($this->isRedelivery) {
76
            $this->logger->debug('Activating redelivery bit');
77
            // DUP flag: if the message is a re-delivery, mark it as such
78
            $this->specialFlags |= 8;
79
        }
80
81
        // Check QoS level and perform the corresponding actions
82 4
        if ($this->message->getQoSLevel() !== 0) {
83
            // 0 for QoS lvl2 for QoS lvl1 and 4 for QoS lvl2
84 3
            $this->specialFlags |= ($this->message->getQoSLevel() * 2);
85 3
            $bitString .= $this->getPacketIdentifierBinaryRepresentation();
86 3
            $this->logger->debug(sprintf('Activating QoS level %d bit', $this->message->getQoSLevel()), [
87 3
                'specialFlags' => $this->specialFlags,
88
            ]);
89
        }
90
91 4
        if ($this->message->isRetained()) {
92
            // RETAIN flag: should the server retain the message?
93 1
            $this->specialFlags |= 1;
94 1
            $this->logger->debug('Activating retain flag', ['specialFlags' => $this->specialFlags]);
95
        }
96
97 4
        $this->logger->info('Variable header created', ['specialFlags' => $this->specialFlags]);
98
99 4
        return $bitString;
100
    }
101
102
    /**
103
     * @return string
104
     * @throws \unreal4u\MQTT\Exceptions\MissingTopicName
105
     * @throws \unreal4u\MQTT\Exceptions\MessageTooBig
106
     * @throws \InvalidArgumentException
107
     */
108 2
    public function createPayload(): string
109
    {
110 2
        if ($this->message === null) {
111 1
            throw new \InvalidArgumentException('A message must be set before publishing');
112
        }
113 1
        return $this->message->getPayload();
114
    }
115
116
    /**
117
     * QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true
118
     * @return bool
119
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
120
     */
121 2
    public function shouldExpectAnswer(): bool
122
    {
123 2
        $shouldExpectAnswer = !($this->message->getQoSLevel() === 0);
124 2
        $this->logger->debug('Checking whether we should expect an answer or not', [
125 2
            'shouldExpectAnswer' => $shouldExpectAnswer,
126
        ]);
127 2
        return $shouldExpectAnswer;
128
    }
129
130 3
    public function expectAnswer(string $data, ClientInterface $client): ReadableContentInterface
131
    {
132 3
        switch ($this->message->getQoSLevel()) {
133 3
            case 1:
134 1
                $pubAck = new PubAck($this->logger);
135 1
                $pubAck->instantiateObject($data, $client);
136 1
                return $pubAck;
137 2
            case 2:
138 1
                $pubRec = new PubRec($this->logger);
139 1
                $pubRec->instantiateObject($data, $client);
140 1
                return $pubRec;
141 1
            case 0:
142
            default:
143 1
                return new EmptyReadableResponse($this->logger);
144
        }
145
    }
146
147
    /**
148
     * Sets the to be sent message
149
     *
150
     * @param Message $message
151
     * @return WritableContentInterface
152
     */
153 19
    public function setMessage(Message $message): WritableContentInterface
154
    {
155 19
        $this->message = $message;
156 19
        return $this;
157
    }
158
159
    /**
160
     * Gets the set message
161
     *
162
     * @return Message
163
     */
164 12
    public function getMessage(): Message
165
    {
166 12
        return $this->message;
167
    }
168
169
    /**
170
     * Sets several bits and pieces from the first byte of the fixed header for the Publish packet
171
     *
172
     * @param int $firstByte
173
     * @param QoSLevel $qoSLevel
174
     * @return Publish
175
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
176
     */
177 11
    private function analyzeFirstByte(int $firstByte, QoSLevel $qoSLevel): Publish
178
    {
179 11
        $this->logger->debug('Analyzing first byte', [sprintf('%08d', decbin($firstByte))]);
180
        // Retained bit is bit 0 of first byte
181 11
        $this->message->setRetainFlag(false);
182 11
        if (($firstByte & 1) === 1) {
183 3
            $this->logger->debug('Setting retain flag to true');
184 3
            $this->message->setRetainFlag(true);
185
        }
186
        // QoS level is already been taken care of, assign it to the message at this point
187 11
        $this->message->setQoSLevel($qoSLevel);
188
189
        // Duplicate message must be checked only on QoS > 0, else set it to false
190 11
        $this->isRedelivery = false;
191 11
        if (($firstByte & 8) === 8 && $this->message->getQoSLevel() !== 0) {
192
            // Is a duplicate is always bit 3 of first byte
193 2
            $this->isRedelivery = true;
194 2
            $this->logger->debug('Setting redelivery bit');
195
        }
196
197 11
        return $this;
198
    }
199
200
    /**
201
     * Finds out the QoS level in a fixed header for the Publish object
202
     *
203
     * @param int $bitString
204
     * @return QoSLevel
205
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
206
     */
207 8
    private function determineIncomingQoSLevel(int $bitString): QoSLevel
208
    {
209
        // QoS lvl are in bit positions 1-2. Shifting is strictly speaking not needed, but increases human comprehension
210 8
        $shiftedBits = $bitString >> 1;
211 8
        $incomingQoSLevel = 0;
212 8
        if (($shiftedBits & 1) === 1) {
213 4
            $incomingQoSLevel = 1;
214
        }
215 8
        if (($shiftedBits & 2) === 2) {
216 2
            $incomingQoSLevel = 2;
217
        }
218
219 8
        $this->logger->debug('Setting QoS level', ['bitString' => $bitString, 'incomingQoSLevel' => $incomingQoSLevel]);
220 8
        return new QoSLevel($incomingQoSLevel);
221
    }
222
223
    /**
224
     * Gets the full message in case this object needs to
225
     *
226
     * @param string $rawMQTTHeaders
227
     * @param ClientInterface $client
228
     * @return string
229
     * @throws \OutOfBoundsException
230
     * @throws \InvalidArgumentException
231
     * @throws \unreal4u\MQTT\Exceptions\MessageTooBig
232
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
233
     */
234 8
    private function completePossibleIncompleteMessage(string $rawMQTTHeaders, ClientInterface $client): string
235
    {
236 8
        if (\strlen($rawMQTTHeaders) === 1) {
237 1
            $this->logger->debug('Only one incoming byte, retrieving rest of size and the full payload');
238 1
            $restOfBytes = $client->readBrokerData(1);
239 1
            $payload = $client->readBrokerData(\ord($restOfBytes));
240
        } else {
241 7
            $this->logger->debug('More than 1 byte detected, calculating and retrieving the rest');
242 7
            $restOfBytes = $rawMQTTHeaders{1};
243 7
            $payload = substr($rawMQTTHeaders, 2);
244 7
            $exactRest = \ord($restOfBytes) - \strlen($payload);
245 7
            $payload .= $client->readBrokerData($exactRest);
246 7
            $rawMQTTHeaders = $rawMQTTHeaders{0};
247
        }
248
249
        // $rawMQTTHeaders may be redefined
250 8
        return $rawMQTTHeaders . $restOfBytes . $payload;
251
    }
252
253
    /**
254
     * Will perform sanity checks and fill in the Readable object with data
255
     * @param string $rawMQTTHeaders
256
     * @param ClientInterface $client
257
     * @return ReadableContentInterface
258
     * @throws \unreal4u\MQTT\Exceptions\MessageTooBig
259
     * @throws \OutOfBoundsException
260
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
261
     * @throws \InvalidArgumentException
262
     * @throws \OutOfRangeException
263
     */
264 4
    public function fillObject(string $rawMQTTHeaders, ClientInterface $client): ReadableContentInterface
265
    {
266 4
        $rawMQTTHeaders = $this->completePossibleIncompleteMessage($rawMQTTHeaders, $client);
267
        // Handy to maintain for debugging purposes
268
        #$this->logger->debug('Headers in binary form', [DebugTools::convertToBinaryRepresentation($rawMQTTHeaders)]);
269
270
        // Topic size is always the 3rd byte
271 4
        $firstByte = \ord($rawMQTTHeaders{0});
272 4
        $topicSize = \ord($rawMQTTHeaders{3});
273 4
        $qosLevel = $this->determineIncomingQoSLevel($firstByte);
274
275 4
        $messageStartPosition = 4;
276 4
        if ($qosLevel->getQoSLevel() > 0) {
277 3
            $this->logger->debug('QoS level above 0, shifting message start position and getting packet identifier');
278
            // [2 (fixed header) + 2 (topic size) + $topicSize] marks the beginning of the 2 packet identifier bytes
279 3
            $this->setPacketIdentifier(new PacketIdentifier(Utilities::convertBinaryStringToNumber(
280 3
                $rawMQTTHeaders{4 + $topicSize} . $rawMQTTHeaders{5 + $topicSize}
281
            )));
282 3
            $this->logger->debug('Determined packet identifier', [
283 3
                'PI' => $this->getPacketIdentifier(),
284 3
                'firstBit' => \ord($rawMQTTHeaders{4 + $topicSize}),
285 3
                'secondBit' => \ord($rawMQTTHeaders{5 + $topicSize})
286
            ]);
287 3
            $messageStartPosition += 2;
288
        }
289
290
        // At this point $rawMQTTHeaders will be always 1 byte long, initialize a Message object with dummy data for now
291 4
        $this->message = new Message(
292
            // Save to assume a constant here: first 2 bytes will always be fixed header, next 2 bytes are topic size
293 4
            substr($rawMQTTHeaders, $messageStartPosition + $topicSize),
294 4
            new Topic(substr($rawMQTTHeaders, 4, $topicSize))
295
        );
296 4
        $this->analyzeFirstByte(\ord($rawMQTTHeaders{0}), $qosLevel);
297
298 4
        $this->logger->debug('Determined headers', [
299 4
            'topicSize' => $topicSize,
300 4
            'QoSLevel' => $this->message->getQoSLevel(),
301 4
            'isDuplicate' => $this->isRedelivery,
302 4
            'isRetained' => $this->message->isRetained(),
303
            #'packetIdentifier' => $this->packetIdentifier->getPacketIdentifierValue(), // This is not always set!
304
        ]);
305
306
307 4
        return $this;
308
    }
309
310
    /**
311
     * @inheritdoc
312
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
313
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
314
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
315
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
316
     */
317 3
    public function performSpecialActions(ClientInterface $client, WritableContentInterface $originalRequest): bool
318
    {
319 3
        $qosLevel = $this->message->getQoSLevel();
320 3
        if ($qosLevel === 0) {
321 1
            $this->logger->debug('No response needed', ['qosLevel', $qosLevel]);
322
        } else {
323 2
            if ($qosLevel === 1) {
324 1
                $this->logger->debug('Responding with PubAck', ['qosLevel' => $qosLevel]);
325 1
                $client->processObject($this->composePubAckAnswer());
326 1
            } elseif ($qosLevel === 2) {
327 1
                $this->logger->debug('Responding with PubRec', ['qosLevel' => $qosLevel]);
328 1
                $client->processObject($this->composePubRecAnswer());
329
            }
330
        }
331
332 3
        return true;
333
    }
334
335
    /**
336
     * Composes a PubRec answer with the same packetIdentifier as what we received
337
     *
338
     * @return PubRec
339
     * @throws \unreal4u\MQTT\Exceptions\InvalidRequest
340
     */
341 2
    private function composePubRecAnswer(): PubRec
342
    {
343 2
        $this->checkForValidPacketIdentifier();
344 2
        $pubRec = new PubRec($this->logger);
345 2
        $pubRec->setPacketIdentifier($this->packetIdentifier);
346 2
        return $pubRec;
347
    }
348
349
    /**
350
     * Composes a PubAck answer with the same packetIdentifier as what we received
351
     *
352
     * @return PubAck
353
     * @throws \unreal4u\MQTT\Exceptions\InvalidRequest
354
     */
355 2
    private function composePubAckAnswer(): PubAck
356
    {
357 2
        $this->checkForValidPacketIdentifier();
358 2
        $pubAck = new PubAck($this->logger);
359 2
        $pubAck->setPacketIdentifier($this->packetIdentifier);
360 2
        return $pubAck;
361
    }
362
363
    /**
364
     * Will check whether the current object has a packet identifier set. If not, we are in serious problems!
365
     *
366
     * @return Publish
367
     * @throws \unreal4u\MQTT\Exceptions\InvalidRequest
368
     */
369 5
    private function checkForValidPacketIdentifier(): self
370
    {
371 5
        if ($this->packetIdentifier === null) {
372 1
            $this->logger->critical('No valid packet identifier found at a stage where there MUST be one set');
373 1
            throw new InvalidRequest('You are trying to send a request without a valid packet identifier');
374
        }
375
376 4
        return $this;
377
    }
378
379
    /**
380
     * PUBLISH packet is the exception to the rule: it is not started on base of a packet that gets sent by us
381
     */
382 1
    public function getOriginControlPacket(): int
383
    {
384 1
        return 0;
385
    }
386
}
387