Completed
Push — master ( 5542e3...0d3b58 )
by Camilo
03:13
created

Publish   A

Complexity

Total Complexity 27

Size/Duplication

Total Lines 254
Duplicated Lines 0 %

Test Coverage

Coverage 34%

Importance

Changes 0
Metric Value
wmc 27
dl 0
loc 254
ccs 34
cts 100
cp 0.34
rs 10
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
B createVariableHeader() 0 36 5
A createPayload() 0 7 2
A shouldExpectAnswer() 0 7 1
B fillObject() 0 45 3
A performSpecialActions() 0 15 4
A composePubAckAnswer() 0 5 1
A expectAnswer() 0 14 4
A setMessage() 0 4 1
A getMessage() 0 3 1
A analyzeFirstByte() 0 19 4
A determineIncomingQoSLevel() 0 4 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\Application\Message;
9
use unreal4u\MQTT\Application\Topic;
10
use unreal4u\MQTT\DataTypes\QoSLevel;
11
use unreal4u\MQTT\Internals\ClientInterface;
12
use unreal4u\MQTT\Internals\ProtocolBase;
13
use unreal4u\MQTT\Internals\ReadableContent;
14
use unreal4u\MQTT\Internals\ReadableContentInterface;
15
use unreal4u\MQTT\Internals\WritableContent;
16
use unreal4u\MQTT\Internals\WritableContentInterface;
17
use unreal4u\MQTT\Utilities;
18
19
/**
20
 * A PUBLISH Control Packet is sent from a Client to a Server or vice-versa to transport an Application Message.
21
 *
22
 * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037
23
 */
24
final class Publish extends ProtocolBase implements ReadableContentInterface, WritableContentInterface
25
{
26
    use ReadableContent, WritableContent;
27
28
    const CONTROL_PACKET_VALUE = 3;
29
30
    /**
31
     * Contains the message to be sent
32
     * @var Message
33
     */
34
    private $message;
35
36
    /**
37
     * Some interchanges with the broker will send or receive a packet identifier
38
     * @var int
39
     */
40
    public $packetIdentifier = 0;
41
42
    /**
43
     * Flag to check whether a message is a redelivery (DUP flag)
44
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
45
     * @var bool
46
     */
47
    public $isRedelivery = false;
48
49 4
    public function createVariableHeader(): string
50
    {
51 4
        if ($this->message === null) {
52 1
            throw new \InvalidArgumentException('You must at least provide a message object with a topic name');
53
        }
54
55 3
        $bitString = $this->createUTF8String($this->message->getTopicName());
56
        // Reset the special flags should the object be reused with another message
57 3
        $this->specialFlags = 0;
58
59 3
        if ($this->isRedelivery) {
60
            $this->logger->debug('Activating redelivery bit');
61
            // DUP flag: if the message is a re-delivery, mark it as such
62
            $this->specialFlags |= 8;
63
        }
64
65
        // Check QoS level and perform the corresponding actions
66 3
        if ($this->message->getQoSLevel() !== 0) {
67
            // 2 for QoS lvl1 and 4 for QoS lvl2
68 2
            $this->specialFlags |= ($this->message->getQoSLevel() * 2);
69 2
            $this->packetIdentifier++;
70 2
            $bitString .= Utilities::convertNumberToBinaryString($this->packetIdentifier);
71 2
            $this->logger->debug(sprintf('Activating QoS level %d bit', $this->message->getQoSLevel()), [
72 2
                'specialFlags' => $this->specialFlags,
73
            ]);
74
        }
75
76 3
        if ($this->message->isRetained()) {
77
            // RETAIN flag: should the server retain the message?
78 1
            $this->specialFlags |= 1;
79 1
            $this->logger->debug('Activating retain flag', ['specialFlags' => $this->specialFlags]);
80
        }
81
82 3
        $this->logger->info('Variable header created', ['specialFlags' => $this->specialFlags]);
83
84 3
        return $bitString;
85
    }
86
87
    public function createPayload(): string
88
    {
89
        if (!$this->message->validateMessage()) {
90
            throw new \InvalidArgumentException('Invalid message');
91
        }
92
93
        return $this->message->getPayload();
94
    }
95
96
    /**
97
     * QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true
98
     * @return bool
99
     */
100 2
    public function shouldExpectAnswer(): bool
101
    {
102 2
        $shouldExpectAnswer = !($this->message->getQoSLevel() === 0);
103 2
        $this->logger->debug('Checking whether we should expect an answer or not', [
104 2
            'shouldExpectAnswer' => $shouldExpectAnswer,
105
        ]);
106 2
        return $shouldExpectAnswer;
107
    }
108
109 2
    public function expectAnswer(string $data, ClientInterface $client): ReadableContentInterface
110
    {
111 2
        switch ($this->message->getQoSLevel()) {
112 2
            case 1:
113 1
                $pubAck = new PubAck($this->logger);
114 1
                $pubAck->instantiateObject($data, $client);
115 1
                return $pubAck;
116 1
            case 2:
117
                $pubRec = new PubRec($this->logger);
118
                $pubRec->instantiateObject($data, $client);
119
                return $pubRec;
120 1
            case 0:
121
            default:
122 1
                return new EmptyReadableResponse($this->logger);
123
        }
124
    }
125
126
    /**
127
     * Sets the to be sent message
128
     *
129
     * @param Message $message
130
     * @return WritableContentInterface
131
     */
132 6
    public function setMessage(Message $message): WritableContentInterface
133
    {
134 6
        $this->message = $message;
135 6
        return $this;
136
    }
137
138
    /**
139
     * Gets the set message
140
     *
141
     * @return Message
142
     */
143
    public function getMessage(): Message
144
    {
145
        return $this->message;
146
    }
147
148
    /**
149
     * Sets several bits and pieces from the first byte of the fixed header for the Publish packet
150
     *
151
     * @param int $firstByte
152
     * @return Publish
153
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
154
     */
155
    private function analyzeFirstByte(int $firstByte): Publish
156
    {
157
        $this->logger->debug('Analyzing first byte', [sprintf('%08d', decbin($firstByte))]);
158
        // Retained bit is bit 0 of first byte
159
        $this->message->setRetainFlag(false);
160
        if ($firstByte & 1) {
161
            $this->message->setRetainFlag(true);
162
        }
163
        // QoS level are the last bits 2 & 1 of the first byte
164
        $this->message->setQoSLevel($this->determineIncomingQoSLevel($firstByte));
165
166
        // Duplicate message must be checked only on QoS > 0, else set it to false
167
        $this->isRedelivery = false;
168
        if ($firstByte & 8 && $this->message->getQoSLevel() !== 0) {
169
            // Is a duplicate is always bit 3 of first byte
170
            $this->isRedelivery = true;
171
        }
172
173
        return $this;
174
    }
175
176
    /**
177
     * Finds out the QoS level in a fixed header for the Publish object
178
     *
179
     * @param int $bitString
180
     * @return int
181
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
182
     */
183
    private function determineIncomingQoSLevel(int $bitString): int
184
    {
185
        // Strange operation, why? Because 4 == QoS lvl2; 2 == QoS lvl1, 0 == QoS lvl0
186
        return (new QoSLevel($bitString & 4 / 2))->getQoSLevel();
187
    }
188
189
    /**
190
     * Will perform sanity checks and fill in the Readable object with data
191
     * @param string $rawMQTTHeaders
192
     * @param ClientInterface $client
193
     * @return ReadableContentInterface
194
     * @throws \OutOfBoundsException
195
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
196
     * @throws \InvalidArgumentException
197
     * @throws \OutOfRangeException
198
     */
199
    public function fillObject(string $rawMQTTHeaders, ClientInterface $client): ReadableContentInterface
200
    {
201
        if (\strlen($rawMQTTHeaders) === 1) {
202
            $this->logger->debug('Fast check, read rest of data from socket');
203
            $restOfBytes = $client->readSocketData(1);
204
            $payload = $client->readSocketData(\ord($restOfBytes));
205
        } else {
206
            $this->logger->debug('Slow form, retransform data and read rest of data');
207
            $restOfBytes = $rawMQTTHeaders{1};
208
            $payload = substr($rawMQTTHeaders, 2);
209
            $exactRest = \ord($restOfBytes) - \strlen($payload);
210
            $payload .= $client->readSocketData($exactRest);
211
            $rawMQTTHeaders = $rawMQTTHeaders{0};
212
        }
213
214
        // At this point, $rawMQTTHeaders will be always 1 byte long
215
        $this->message = new Message();
216
        $this->analyzeFirstByte(\ord($rawMQTTHeaders));
217
        // $rawMQTTHeaders may be redefined
218
        $rawMQTTHeaders = $rawMQTTHeaders . $restOfBytes . $payload;
219
220
        // Topic size is always the 3rd byte
221
        $topicSize = \ord($rawMQTTHeaders{3});
222
223
        $messageStartPosition = 4;
224
        if ($this->message->getQoSLevel() > 0) {
225
            // [2 (fixed header) + 2 (topic size) + $topicSize] marks the beginning of the 2 packet identifier bytes
226
            $this->packetIdentifier = Utilities::convertBinaryStringToNumber(
227
                $rawMQTTHeaders{5 + $topicSize} . $rawMQTTHeaders{4 + $topicSize}
228
            );
229
            $messageStartPosition += 2;
230
        }
231
232
        $this->logger->debug('Determined headers', [
233
            'topicSize' => $topicSize,
234
            'QoSLevel' => $this->message->getQoSLevel(),
235
            'isDuplicate' => $this->isRedelivery,
236
            'isRetained' => $this->message->isRetained(),
237
            'packetIdentifier' => $this->packetIdentifier,
238
        ]);
239
240
        $this->message->setPayload(substr($rawMQTTHeaders, $messageStartPosition + $topicSize));
241
        $this->message->setTopic(new Topic(substr($rawMQTTHeaders, $messageStartPosition, $topicSize)));
242
243
        return $this;
244
    }
245
246
    /**
247
     * @inheritdoc
248
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
249
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
250
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
251
     */
252
    public function performSpecialActions(ClientInterface $client, WritableContentInterface $originalRequest): bool
253
    {
254
        if ($this->message->getQoSLevel() === 0) {
255
            $this->logger->debug('No response needed', ['qosLevel', $this->message->getQoSLevel()]);
256
        } else {
257
            if ($this->message->getQoSLevel() === 1) {
258
                $this->logger->debug('Responding with PubAck', ['qosLevel' => $this->message->getQoSLevel()]);
259
                $client->sendData($this->composePubAckAnswer());
260
            } elseif ($this->message->getQoSLevel() === 2) {
261
                $this->logger->debug('Responding with PubRec', ['qosLevel' => $this->message->getQoSLevel()]);
262
                $client->sendData(new PubRec($this->logger));
263
            }
264
        }
265
266
        return true;
267
    }
268
269
    /**
270
     * Composes a PubAck answer with the same packetIdentifier as what we received
271
     * @return PubAck
272
     */
273
    private function composePubAckAnswer(): PubAck
274
    {
275
        $pubAck = new PubAck($this->logger);
276
        $pubAck->packetIdentifier = $this->packetIdentifier;
277
        return $pubAck;
278
    }
279
}
280