Passed
Push — master ( 6659fb...086e81 )
by Camilo
02:20
created

Publish::fillObject()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 54
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
dl 0
loc 54
ccs 0
cts 34
cp 0
rs 9.6716
c 0
b 0
f 0
cc 3
eloc 34
nc 4
nop 2
crap 12

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\Application\Message;
9
use unreal4u\MQTT\Application\Topic;
10
use unreal4u\MQTT\DataTypes\QoSLevel;
11
use unreal4u\MQTT\Internals\ClientInterface;
12
use unreal4u\MQTT\Internals\ProtocolBase;
13
use unreal4u\MQTT\Internals\ReadableContent;
14
use unreal4u\MQTT\Internals\ReadableContentInterface;
15
use unreal4u\MQTT\Internals\WritableContent;
16
use unreal4u\MQTT\Internals\WritableContentInterface;
17
use unreal4u\MQTT\Utilities;
18
19
/**
20
 * A PUBLISH Control Packet is sent from a Client to a Server or vice-versa to transport an Application Message.
21
 *
22
 * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037
23
 */
24
final class Publish extends ProtocolBase implements ReadableContentInterface, WritableContentInterface
25
{
26
    use ReadableContent, WritableContent;
27
28
    const CONTROL_PACKET_VALUE = 3;
29
30
    /**
31
     * Contains the message to be sent
32
     * @var Message
33
     */
34
    private $message;
35
36
    /**
37
     * Some interchanges with the broker will send or receive a packet identifier
38
     * @var int
39
     */
40
    public $packetIdentifier = 0;
41
42
    /**
43
     * Flag to check whether a message is a redelivery (DUP flag)
44
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
45
     * @var bool
46
     */
47
    public $isRedelivery = false;
48
49
    /**
50
     * @return string
51
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
52
     * @throws \unreal4u\MQTT\Exceptions\MissingTopicName
53
     * @throws \OutOfRangeException
54
     * @throws \InvalidArgumentException
55
     */
56 4
    public function createVariableHeader(): string
57
    {
58 4
        if ($this->message === null) {
59 1
            throw new \InvalidArgumentException('You must at least provide a message object with a topic name');
60
        }
61
62 3
        $bitString = $this->createUTF8String($this->message->getTopicName());
63
        // Reset the special flags should the object be reused with another message
64 3
        $this->specialFlags = 0;
65
66 3
        if ($this->isRedelivery) {
67
            $this->logger->debug('Activating redelivery bit');
68
            // DUP flag: if the message is a re-delivery, mark it as such
69
            $this->specialFlags |= 8;
70
        }
71
72
        // Check QoS level and perform the corresponding actions
73 3
        if ($this->message->getQoSLevel() !== 0) {
74
            // 0 for QoS lvl2 for QoS lvl1 and 4 for QoS lvl2
75 2
            $this->specialFlags |= ($this->message->getQoSLevel() * 2);
76 2
            $bitString .= Utilities::convertNumberToBinaryString($this->packetIdentifier);
77 2
            $this->logger->debug(sprintf('Activating QoS level %d bit', $this->message->getQoSLevel()), [
78 2
                'specialFlags' => $this->specialFlags,
79
            ]);
80
        }
81
82 3
        if ($this->message->isRetained()) {
83
            // RETAIN flag: should the server retain the message?
84 1
            $this->specialFlags |= 1;
85 1
            $this->logger->debug('Activating retain flag', ['specialFlags' => $this->specialFlags]);
86
        }
87
88 3
        $this->logger->info('Variable header created', ['specialFlags' => $this->specialFlags]);
89
90 3
        return $bitString;
91
    }
92
93
    public function createPayload(): string
94
    {
95
        if (!$this->message->validateMessage()) {
96
            throw new \InvalidArgumentException('Invalid message');
97
        }
98
99
        return $this->message->getPayload();
100
    }
101
102
    /**
103
     * QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true
104
     * @return bool
105
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
106
     */
107 2
    public function shouldExpectAnswer(): bool
108
    {
109 2
        $shouldExpectAnswer = !($this->message->getQoSLevel() === 0);
110 2
        $this->logger->debug('Checking whether we should expect an answer or not', [
111 2
            'shouldExpectAnswer' => $shouldExpectAnswer,
112
        ]);
113 2
        return $shouldExpectAnswer;
114
    }
115
116 2
    public function expectAnswer(string $data, ClientInterface $client): ReadableContentInterface
117
    {
118 2
        switch ($this->message->getQoSLevel()) {
119 2
            case 1:
120 1
                $pubAck = new PubAck($this->logger);
121 1
                $pubAck->instantiateObject($data, $client);
122 1
                return $pubAck;
123 1
            case 2:
124
                $pubRec = new PubRec($this->logger);
125
                $pubRec->instantiateObject($data, $client);
126
                return $pubRec;
127 1
            case 0:
128
            default:
129 1
                return new EmptyReadableResponse($this->logger);
130
        }
131
    }
132
133
    /**
134
     * Sets the to be sent message
135
     *
136
     * @param Message $message
137
     * @return WritableContentInterface
138
     */
139 6
    public function setMessage(Message $message): WritableContentInterface
140
    {
141 6
        $this->message = $message;
142 6
        return $this;
143
    }
144
145
    /**
146
     * Gets the set message
147
     *
148
     * @return Message
149
     */
150
    public function getMessage(): Message
151
    {
152
        return $this->message;
153
    }
154
155
    /**
156
     * Sets several bits and pieces from the first byte of the fixed header for the Publish packet
157
     *
158
     * @param int $firstByte
159
     * @return Publish
160
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
161
     */
162
    private function analyzeFirstByte(int $firstByte): Publish
163
    {
164
        $this->logger->debug('Analyzing first byte', [sprintf('%08d', decbin($firstByte))]);
165
        // Retained bit is bit 0 of first byte
166
        $this->message->setRetainFlag(false);
167
        if ($firstByte & 1) {
168
            $this->logger->debug('Setting retain flag to true');
169
            $this->message->setRetainFlag(true);
170
        }
171
        // QoS level are the last bits 2 & 1 of the first byte
172
        $this->message->setQoSLevel($this->determineIncomingQoSLevel($firstByte));
173
174
        // Duplicate message must be checked only on QoS > 0, else set it to false
175
        $this->isRedelivery = false;
176
        if ($firstByte & 8 && $this->message->getQoSLevel() !== 0) {
177
            // Is a duplicate is always bit 3 of first byte
178
            $this->isRedelivery = true;
179
            $this->logger->debug('Setting redelivery bit');
180
        }
181
182
        return $this;
183
    }
184
185
    /**
186
     * Finds out the QoS level in a fixed header for the Publish object
187
     *
188
     * @param int $bitString
189
     * @return QoSLevel
190
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
191
     */
192
    private function determineIncomingQoSLevel(int $bitString): QoSLevel
193
    {
194
        // Strange operation, why? Because 4 == QoS lvl2; 2 == QoS lvl1, 0 == QoS lvl0
195
        $incomingQoSLevel = ($bitString & 4) / 2;
196
        $this->logger->debug('Setting QoS level', ['incomingQoSLevel' => $incomingQoSLevel]);
197
        return new QoSLevel($incomingQoSLevel);
198
    }
199
200
    /**
201
     * Will perform sanity checks and fill in the Readable object with data
202
     * @param string $rawMQTTHeaders
203
     * @param ClientInterface $client
204
     * @return ReadableContentInterface
205
     * @throws \OutOfBoundsException
206
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
207
     * @throws \InvalidArgumentException
208
     * @throws \OutOfRangeException
209
     */
210
    public function fillObject(string $rawMQTTHeaders, ClientInterface $client): ReadableContentInterface
211
    {
212
        if (\strlen($rawMQTTHeaders) === 1) {
213
            $this->logger->debug('Fast check, read rest of data from socket');
214
            $restOfBytes = $client->readSocketData(1);
215
            $payload = $client->readSocketData(\ord($restOfBytes));
216
        } else {
217
            $this->logger->debug('Slow form, retransform data and read rest of data');
218
            $restOfBytes = $rawMQTTHeaders{1};
219
            $payload = substr($rawMQTTHeaders, 2);
220
            $exactRest = \ord($restOfBytes) - \strlen($payload);
221
            $payload .= $client->readSocketData($exactRest);
222
            $rawMQTTHeaders = $rawMQTTHeaders{0};
223
        }
224
225
        // At this point, $rawMQTTHeaders will be always 1 byte long
226
        $this->message = new Message();
227
        $this->analyzeFirstByte(\ord($rawMQTTHeaders));
228
        // $rawMQTTHeaders may be redefined
229
        $rawMQTTHeaders = $rawMQTTHeaders . $restOfBytes . $payload;
230
        #$this->logger->debug('complete headers', ['header' => str2bin($rawMQTTHeaders)]);
231
232
        // Topic size is always the 3rd byte
233
        $topicSize = \ord($rawMQTTHeaders{3});
234
235
        $messageStartPosition = 4;
236
        if ($this->message->getQoSLevel() > 0) {
237
            $this->logger->debug('QoS level above 0, shifting message start position and getting packet identifier');
238
            // [2 (fixed header) + 2 (topic size) + $topicSize] marks the beginning of the 2 packet identifier bytes
239
            $this->packetIdentifier = Utilities::convertBinaryStringToNumber(
240
                $rawMQTTHeaders{4 + $topicSize} . $rawMQTTHeaders{5 + $topicSize}
241
            );
242
            $this->logger->debug('Determined packet identifier', [
243
                'calculated' => $this->packetIdentifier,
244
                'firstBit' => \ord($rawMQTTHeaders{4 + $topicSize}),
245
                'secondBit' => \ord($rawMQTTHeaders{5 + $topicSize})
246
            ]);
247
            $messageStartPosition += 2;
248
        }
249
250
        $this->logger->debug('Determined headers', [
251
            'topicSize' => $topicSize,
252
            'QoSLevel' => $this->message->getQoSLevel(),
253
            'isDuplicate' => $this->isRedelivery,
254
            'isRetained' => $this->message->isRetained(),
255
            'packetIdentifier' => $this->packetIdentifier,
256
        ]);
257
258
        $this->message->setPayload(substr($rawMQTTHeaders, $messageStartPosition + $topicSize));
259
        // Save to assume a constant here: first 2 bytes will always be fixed header, next 2 bytes are topic size
260
        $this->message->setTopic(new Topic(substr($rawMQTTHeaders, 4, $topicSize)));
261
        #$this->logger->debug('Found a topic name', ['name' => $this->message->getTopicName()]);
262
263
        return $this;
264
    }
265
266
    /**
267
     * @inheritdoc
268
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
269
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
270
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
271
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
272
     */
273
    public function performSpecialActions(ClientInterface $client, WritableContentInterface $originalRequest): bool
274
    {
275
        $qosLevel = $this->message->getQoSLevel();
276
        if ($qosLevel === 0) {
277
            $this->logger->debug('No response needed', ['qosLevel', $qosLevel]);
278
        } else {
279
            if ($qosLevel === 1) {
280
                $this->logger->debug('Responding with PubAck', ['qosLevel' => $qosLevel]);
281
                $client->sendData($this->composePubAckAnswer());
282
            } elseif ($qosLevel === 2) {
283
                $this->logger->debug('Responding with PubRec', ['qosLevel' => $qosLevel]);
284
                $client->sendData($this->composePubRecAnswer());
285
            }
286
        }
287
288
        return true;
289
    }
290
291
    private function composePubRecAnswer(): PubRec
292
    {
293
        $pubRec = new PubRec($this->logger);
294
        $pubRec->packetIdentifier = $this->packetIdentifier;
295
        return $pubRec;
296
    }
297
298
    /**
299
     * Composes a PubAck answer with the same packetIdentifier as what we received
300
     * @return PubAck
301
     */
302
    private function composePubAckAnswer(): PubAck
303
    {
304
        $pubAck = new PubAck($this->logger);
305
        $pubAck->packetIdentifier = $this->packetIdentifier;
306
        return $pubAck;
307
    }
308
309
    /**
310
     * PUBLISH packet is the exception to the rule: it is not started on base of a packet that gets sent by us
311
     */
312
    public function originPacketIdentifier(): int
313
    {
314
        return 0;
315
    }
316
}
317