Completed
Push — master ( 4ca550...f445a9 )
by Camilo
02:08
created

Publish::fillObject()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 37
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 37
ccs 0
cts 22
cp 0
rs 8.8571
c 0
b 0
f 0
cc 2
eloc 21
nc 2
nop 2
crap 6
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\Application\Message;
9
use unreal4u\MQTT\DataTypes\Topic;
10
use unreal4u\MQTT\DataTypes\PacketIdentifier;
11
use unreal4u\MQTT\DataTypes\QoSLevel;
12
use unreal4u\MQTT\Internals\ClientInterface;
13
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
14
use unreal4u\MQTT\Internals\ProtocolBase;
15
use unreal4u\MQTT\Internals\ReadableContent;
16
use unreal4u\MQTT\Internals\ReadableContentInterface;
17
use unreal4u\MQTT\Internals\WritableContent;
18
use unreal4u\MQTT\Internals\WritableContentInterface;
19
use unreal4u\MQTT\Utilities;
20
21
/**
22
 * A PUBLISH Control Packet is sent from a Client to a Server or vice-versa to transport an Application Message.
23
 *
24
 * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037
25
 */
26
final class Publish extends ProtocolBase implements ReadableContentInterface, WritableContentInterface
27
{
28
    use ReadableContent, WritableContent, PacketIdentifierFunctionality;
29
30
    const CONTROL_PACKET_VALUE = 3;
31
32
    /**
33
     * Contains the message to be sent
34
     * @var Message
35
     */
36
    private $message;
37
38
    /**
39
     * Flag to check whether a message is a redelivery (DUP flag)
40
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
41
     * @var bool
42
     */
43
    public $isRedelivery = false;
44
45
    /**
46
     * @return string
47
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
48
     * @throws \unreal4u\MQTT\Exceptions\MissingTopicName
49
     * @throws \OutOfRangeException
50
     * @throws \InvalidArgumentException
51
     */
52 4
    public function createVariableHeader(): string
53
    {
54 4
        if ($this->message === null) {
55 1
            throw new \InvalidArgumentException('You must at least provide a message object with a topic name');
56
        }
57
58 3
        $bitString = $this->createUTF8String($this->message->getTopicName());
59
        // Reset the special flags should the object be reused with another message
60 3
        $this->specialFlags = 0;
61
62 3
        if ($this->isRedelivery) {
63
            $this->logger->debug('Activating redelivery bit');
64
            // DUP flag: if the message is a re-delivery, mark it as such
65
            $this->specialFlags |= 8;
66
        }
67
68
        // Check QoS level and perform the corresponding actions
69 3
        if ($this->message->getQoSLevel() !== 0) {
70
            // 0 for QoS lvl2 for QoS lvl1 and 4 for QoS lvl2
71 2
            $this->specialFlags |= ($this->message->getQoSLevel() * 2);
72 2
            $bitString .= $this->getPacketIdentifierBinaryRepresentation();
73 2
            $this->logger->debug(sprintf('Activating QoS level %d bit', $this->message->getQoSLevel()), [
74 2
                'specialFlags' => $this->specialFlags,
75
            ]);
76
        }
77
78 3
        if ($this->message->isRetained()) {
79
            // RETAIN flag: should the server retain the message?
80 1
            $this->specialFlags |= 1;
81 1
            $this->logger->debug('Activating retain flag', ['specialFlags' => $this->specialFlags]);
82
        }
83
84 3
        $this->logger->info('Variable header created', ['specialFlags' => $this->specialFlags]);
85
86 3
        return $bitString;
87
    }
88
89
    public function createPayload(): string
90
    {
91
        if (!$this->message->validateMessage()) {
92
            throw new \InvalidArgumentException('Invalid message');
93
        }
94
95
        return $this->message->getPayload();
96
    }
97
98
    /**
99
     * QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true
100
     * @return bool
101
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
102
     */
103 2
    public function shouldExpectAnswer(): bool
104
    {
105 2
        $shouldExpectAnswer = !($this->message->getQoSLevel() === 0);
106 2
        $this->logger->debug('Checking whether we should expect an answer or not', [
107 2
            'shouldExpectAnswer' => $shouldExpectAnswer,
108
        ]);
109 2
        return $shouldExpectAnswer;
110
    }
111
112 2
    public function expectAnswer(string $data, ClientInterface $client): ReadableContentInterface
113
    {
114 2
        switch ($this->message->getQoSLevel()) {
115 2
            case 1:
116 1
                $pubAck = new PubAck($this->logger);
117 1
                $pubAck->instantiateObject($data, $client);
118 1
                return $pubAck;
119 1
            case 2:
120
                $pubRec = new PubRec($this->logger);
121
                $pubRec->instantiateObject($data, $client);
122
                return $pubRec;
123 1
            case 0:
124
            default:
125 1
                return new EmptyReadableResponse($this->logger);
126
        }
127
    }
128
129
    /**
130
     * Sets the to be sent message
131
     *
132
     * @param Message $message
133
     * @return WritableContentInterface
134
     */
135 6
    public function setMessage(Message $message): WritableContentInterface
136
    {
137 6
        $this->message = $message;
138 6
        return $this;
139
    }
140
141
    /**
142
     * Gets the set message
143
     *
144
     * @return Message
145
     */
146
    public function getMessage(): Message
147
    {
148
        return $this->message;
149
    }
150
151
    /**
152
     * Sets several bits and pieces from the first byte of the fixed header for the Publish packet
153
     *
154
     * @param int $firstByte
155
     * @return Publish
156
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
157
     */
158
    private function analyzeFirstByte(int $firstByte): Publish
159
    {
160
        $this->logger->debug('Analyzing first byte', [sprintf('%08d', decbin($firstByte))]);
161
        // Retained bit is bit 0 of first byte
162
        $this->message->setRetainFlag(false);
163
        if ($firstByte & 1) {
164
            $this->logger->debug('Setting retain flag to true');
165
            $this->message->setRetainFlag(true);
166
        }
167
        // QoS level are the last bits 2 & 1 of the first byte
168
        $this->message->setQoSLevel($this->determineIncomingQoSLevel($firstByte));
169
170
        // Duplicate message must be checked only on QoS > 0, else set it to false
171
        $this->isRedelivery = false;
172
        if ($firstByte & 8 && $this->message->getQoSLevel() !== 0) {
173
            // Is a duplicate is always bit 3 of first byte
174
            $this->isRedelivery = true;
175
            $this->logger->debug('Setting redelivery bit');
176
        }
177
178
        return $this;
179
    }
180
181
    /**
182
     * Finds out the QoS level in a fixed header for the Publish object
183
     *
184
     * @param int $bitString
185
     * @return QoSLevel
186
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
187
     */
188
    private function determineIncomingQoSLevel(int $bitString): QoSLevel
189
    {
190
        // Strange operation, why? Because 4 == QoS lvl2; 2 == QoS lvl1, 0 == QoS lvl0
191
        $incomingQoSLevel = ($bitString & 4) / 2;
192
        $this->logger->debug('Setting QoS level', ['incomingQoSLevel' => $incomingQoSLevel]);
193
        return new QoSLevel($incomingQoSLevel);
194
    }
195
196
    /**
197
     * Gets the full message in case this object needs to
198
     *
199
     * @param string $rawMQTTHeaders
200
     * @param ClientInterface $client
201
     * @return string
202
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
203
     */
204
    private function getFullRawHeaders(string $rawMQTTHeaders, ClientInterface $client): string
205
    {
206
        if (\strlen($rawMQTTHeaders) === 1) {
207
            $this->logger->debug('Only one incoming byte, retrieving rest of size and the full payload');
208
            $restOfBytes = $client->readBrokerData(1);
209
            $payload = $client->readBrokerData(\ord($restOfBytes));
210
        } else {
211
            $this->logger->debug('More than 1 byte detected, calculating and retrieving the rest');
212
            $restOfBytes = $rawMQTTHeaders{1};
213
            $payload = substr($rawMQTTHeaders, 2);
214
            $exactRest = \ord($restOfBytes) - \strlen($payload);
215
            $payload .= $client->readBrokerData($exactRest);
216
            $rawMQTTHeaders = $rawMQTTHeaders{0};
217
        }
218
219
        // At this point, $rawMQTTHeaders will be always 1 byte long
220
        $this->message = new Message();
221
        $this->analyzeFirstByte(\ord($rawMQTTHeaders));
222
        // $rawMQTTHeaders may be redefined
223
        return $rawMQTTHeaders . $restOfBytes . $payload;
224
    }
225
226
    /**
227
     * Will perform sanity checks and fill in the Readable object with data
228
     * @param string $rawMQTTHeaders
229
     * @param ClientInterface $client
230
     * @return ReadableContentInterface
231
     * @throws \OutOfBoundsException
232
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
233
     * @throws \InvalidArgumentException
234
     * @throws \OutOfRangeException
235
     */
236
    public function fillObject(string $rawMQTTHeaders, ClientInterface $client): ReadableContentInterface
237
    {
238
        $rawMQTTHeaders = $this->getFullRawHeaders($rawMQTTHeaders, $client);
239
        #$this->logger->debug('complete headers', ['header' => str2bin($rawMQTTHeaders)]);
240
241
        // Topic size is always the 3rd byte
242
        $topicSize = \ord($rawMQTTHeaders{3});
243
244
        $messageStartPosition = 4;
245
        if ($this->message->getQoSLevel() > 0) {
246
            $this->logger->debug('QoS level above 0, shifting message start position and getting packet identifier');
247
            // [2 (fixed header) + 2 (topic size) + $topicSize] marks the beginning of the 2 packet identifier bytes
248
            $this->setPacketIdentifier(new PacketIdentifier(Utilities::convertBinaryStringToNumber(
249
                $rawMQTTHeaders{4 + $topicSize} . $rawMQTTHeaders{5 + $topicSize}
250
            )));
251
            $this->logger->debug('Determined packet identifier', [
252
                'PI' => $this->getPacketIdentifier(),
253
                'firstBit' => \ord($rawMQTTHeaders{4 + $topicSize}),
254
                'secondBit' => \ord($rawMQTTHeaders{5 + $topicSize})
255
            ]);
256
            $messageStartPosition += 2;
257
        }
258
259
        $this->logger->debug('Determined headers', [
260
            'topicSize' => $topicSize,
261
            'QoSLevel' => $this->message->getQoSLevel(),
262
            'isDuplicate' => $this->isRedelivery,
263
            'isRetained' => $this->message->isRetained(),
264
            'packetIdentifier' => $this->packetIdentifier->getPacketIdentifierValue(),
265
        ]);
266
267
        $this->message->setPayload(substr($rawMQTTHeaders, $messageStartPosition + $topicSize));
268
        // Save to assume a constant here: first 2 bytes will always be fixed header, next 2 bytes are topic size
269
        $this->message->setTopic(new Topic(substr($rawMQTTHeaders, 4, $topicSize)));
270
        #$this->logger->debug('Found a topic name', ['name' => $this->message->getTopicName()]);
271
272
        return $this;
273
    }
274
275
    /**
276
     * @inheritdoc
277
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
278
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
279
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
280
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
281
     */
282
    public function performSpecialActions(ClientInterface $client, WritableContentInterface $originalRequest): bool
283
    {
284
        $qosLevel = $this->message->getQoSLevel();
285
        if ($qosLevel === 0) {
286
            $this->logger->debug('No response needed', ['qosLevel', $qosLevel]);
287
        } else {
288
            if ($qosLevel === 1) {
289
                $this->logger->debug('Responding with PubAck', ['qosLevel' => $qosLevel]);
290
                $client->processObject($this->composePubAckAnswer());
291
            } elseif ($qosLevel === 2) {
292
                $this->logger->debug('Responding with PubRec', ['qosLevel' => $qosLevel]);
293
                $client->processObject($this->composePubRecAnswer());
294
            }
295
        }
296
297
        return true;
298
    }
299
300
    private function composePubRecAnswer(): PubRec
301
    {
302
        $pubRec = new PubRec($this->logger);
303
        $pubRec->setPacketIdentifier($this->packetIdentifier);
304
        return $pubRec;
305
    }
306
307
    /**
308
     * Composes a PubAck answer with the same packetIdentifier as what we received
309
     * @return PubAck
310
     */
311
    private function composePubAckAnswer(): PubAck
312
    {
313
        $pubAck = new PubAck($this->logger);
314
        $pubAck->setPacketIdentifier($this->packetIdentifier);
315
        return $pubAck;
316
    }
317
318
    /**
319
     * PUBLISH packet is the exception to the rule: it is not started on base of a packet that gets sent by us
320
     */
321
    public function getOriginControlPacket(): int
322
    {
323
        return 0;
324
    }
325
}
326