Completed
Push — master ( 1d7f97...fc052b )
by Camilo
01:42
created

Publish::shouldExpectAnswer()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 1
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\Application\Message;
9
use unreal4u\MQTT\Application\PayloadInterface;
10
use unreal4u\MQTT\Client;
11
use unreal4u\MQTT\Exceptions\InvalidQoSLevel;
12
use unreal4u\MQTT\Internals\ProtocolBase;
13
use unreal4u\MQTT\Internals\ReadableContent;
14
use unreal4u\MQTT\Internals\ReadableContentInterface;
15
use unreal4u\MQTT\Internals\WritableContent;
16
use unreal4u\MQTT\Internals\WritableContentInterface;
17
use unreal4u\MQTT\Utilities;
18
19
/**
20
 * A PUBLISH Control Packet is sent from a Client to a Server or vice-versa to transport an Application Message.
21
 *
22
 * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037
23
 */
24
final class Publish extends ProtocolBase implements ReadableContentInterface, WritableContentInterface
25
{
26
    use ReadableContent;
27
    use WritableContent;
28
29
    const CONTROL_PACKET_VALUE = 3;
30
31
    /**
32
     * Contains the message to be sent
33
     * @var Message
34
     */
35
    private $message;
36
37
    /**
38
     * Some interchanges with the broker will send or receive a packet identifier
39
     * @var int
40
     */
41
    public $packetIdentifier = 0;
42
43
    /**
44
     * Flag to check whether a message is a redelivery (DUP flag)
45
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
46
     * @var bool
47
     */
48
    public $isRedelivery = false;
49
50
    /**
51
     * @var PayloadInterface
52
     */
53
    private $payloadType;
54
55 4
    public function createVariableHeader(): string
56
    {
57 4
        if ($this->message === null) {
58 1
            throw new \InvalidArgumentException('You must at least provide a message object with a topic name');
59
        }
60
61 3
        $bitString = $this->createUTF8String($this->message->getTopicName());
62
        // Reset the special flags should the object be reused with another message
63 3
        $this->specialFlags = 0;
64
65 3
        if ($this->isRedelivery) {
66
            $this->logger->debug('Activating redelivery bit');
67
            // DUP flag: if the message is a re-delivery, mark it as such
68
            $this->specialFlags |= 8;
69
        }
70
71
        // Check QoS level and perform the corresponding actions
72 3
        if ($this->message->getQoSLevel() !== 0) {
73
            // 2 for QoS lvl1 and 4 for QoS lvl2
74 2
            $this->specialFlags |= ($this->message->getQoSLevel() * 2);
75 2
            $this->packetIdentifier++;
76 2
            $bitString .= Utilities::convertNumberToBinaryString($this->packetIdentifier);
77 2
            $this->logger->debug(sprintf('Activating QoS level %d bit', $this->message->getQoSLevel()), [
78 2
                'specialFlags' => $this->specialFlags,
79
            ]);
80
        }
81
82 3
        if ($this->message->isRetained()) {
83
            // RETAIN flag: should the server retain the message?
84 1
            $this->specialFlags |= 1;
85 1
            $this->logger->debug('Activating retain flag', ['specialFlags' => $this->specialFlags]);
86
        }
87
88 3
        $this->logger->info('Variable header created', ['specialFlags' => $this->specialFlags]);
89
90 3
        return $bitString;
91
    }
92
93
    /**
94
     * Sets the specific payload we should be listening for on this topic(s)
95
     *
96
     * @param PayloadInterface $payloadType
97
     * @return Publish
98
     */
99
    public function setPayloadType(PayloadInterface $payloadType): Publish
100
    {
101
        $this->payloadType = $payloadType;
102
103
        return $this;
104
    }
105
106
    public function createPayload(): string
107
    {
108
        if (!$this->message->validateMessage()) {
109
            throw new \InvalidArgumentException('Invalid message');
110
        }
111
112
        return $this->message->getPayload();
113
    }
114
115
    /**
116
     * QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true
117
     * @return bool
118
     */
119 4
    public function shouldExpectAnswer(): bool
120
    {
121 4
        return !($this->message->getQoSLevel() === 0);
122
    }
123
124 2
    public function expectAnswer(string $data): ReadableContentInterface
125
    {
126 2
        if ($this->shouldExpectAnswer() === false) {
127 1
            return new EmptyReadableResponse($this->logger);
128
        }
129
130 1
        $pubAck = new PubAck($this->logger);
131 1
        $pubAck->populate($data);
132 1
        return $pubAck;
133
    }
134
135
    /**
136
     * Sets the to be sent message
137
     *
138
     * @param Message $message
139
     * @return WritableContentInterface
140
     */
141 6
    public function setMessage(Message $message): WritableContentInterface
142
    {
143 6
        $this->message = $message;
144 6
        return $this;
145
    }
146
147
    /**
148
     * Gets the set message
149
     *
150
     * @return Message
151
     */
152
    public function getMessage(): Message
153
    {
154
        return $this->message;
155
    }
156
157
    /**
158
     * Sets several bits and pieces from the first byte of the fixed header for the Publish packet
159
     *
160
     * @param int $firstByte
161
     * @return Publish
162
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
163
     */
164
    private function analyzeFirstByte(int $firstByte): Publish
165
    {
166
        $this->logger->debug('Analyzing first byte', [sprintf('%08d', decbin($firstByte))]);
167
        // Retained bit is bit 0 of first byte
168
        $this->message->setRetainFlag(false);
169
        if ($firstByte & 1) {
170
            $this->message->setRetainFlag(true);
171
        }
172
        // QoS level are the last bits 2 & 1 of the first byte
173
        $this->message->setQoSLevel($this->determineIncomingQoSLevel($firstByte));
174
175
        // Duplicate message must be checked only on QoS > 0, else set it to false
176
        $this->isRedelivery = false;
177
        if ($firstByte & 8 && $this->message->getQoSLevel() !== 0) {
178
            // Is a duplicate is always bit 3 of first byte
179
            $this->isRedelivery = true;
180
        }
181
182
        return $this;
183
    }
184
185
    /**
186
     * Finds out the QoS level in a fixed header for the Publish object
187
     *
188
     * @param int $bitString
189
     * @return int
190
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
191
     */
192
    private function determineIncomingQoSLevel(int $bitString): int
193
    {
194
        // QoS lvl 6 does not exist, throw exception
195
        if (($bitString & 6) >= 6) {
196
            throw new InvalidQoSLevel('Invalid QoS level "' . $bitString . '" found (both bits set?)');
197
        }
198
199
        // Strange operation, why? Because 4 == QoS lvl2; 2 == QoS lvl1, 0 == QoS lvl0
200
        return $bitString & 4 / 2;
201
    }
202
203
    /**
204
     * Will perform sanity checks and fill in the Readable object with data
205
     * @param string $rawMQTTHeaders
206
     * @return ReadableContentInterface
207
     * @throws \unreal4u\MQTT\Exceptions\InvalidQoSLevel
208
     */
209
    public function fillObject(string $rawMQTTHeaders): ReadableContentInterface
210
    {
211
        $this->message = new Message();
212
        $this->analyzeFirstByte(\ord($rawMQTTHeaders{0}));
213
214
        // Topic size is always the 3rd byte
215
        $topicSize = \ord($rawMQTTHeaders{3});
216
217
        $messageStartPosition = 4;
218
        if ($this->message->getQoSLevel() > 0) {
219
            // 2 (fixed header) + 2 (topic size) + $topicSize marks the beginning of the 2 packet identifier bytes
220
            $this->packetIdentifier = \ord($rawMQTTHeaders{5 + $topicSize} . $rawMQTTHeaders{4 + $topicSize});
221
            $messageStartPosition += 2;
222
        }
223
224
        $this->logger->debug('Determined headers', [
225
            'topicSize' => $topicSize,
226
            'QoSLevel' => $this->message->getQoSLevel(),
227
            'isDuplicate' => $this->isRedelivery,
228
            'isRetained' => $this->message->isRetained(),
229
            'packetIdentifier' => $this->packetIdentifier,
230
        ]);
231
        $payload = clone $this->payloadType;
232
233
        $this->message->setPayload($payload->setPayload(substr($rawMQTTHeaders, $messageStartPosition + $topicSize)));
0 ignored issues
show
Bug introduced by
It seems like substr($rawMQTTHeaders, ...tPosition + $topicSize) can also be of type false; however, parameter $contents of unreal4u\MQTT\Applicatio...Interface::setPayload() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

233
        $this->message->setPayload($payload->setPayload(/** @scrutinizer ignore-type */ substr($rawMQTTHeaders, $messageStartPosition + $topicSize)));
Loading history...
234
        $this->message->setTopicName(substr($rawMQTTHeaders, $messageStartPosition, $topicSize));
0 ignored issues
show
Bug introduced by
It seems like substr($rawMQTTHeaders, ...rtPosition, $topicSize) can also be of type false; however, parameter $topicName of unreal4u\MQTT\Application\Message::setTopicName() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

234
        $this->message->setTopicName(/** @scrutinizer ignore-type */ substr($rawMQTTHeaders, $messageStartPosition, $topicSize));
Loading history...
235
236
        return $this;
237
    }
238
239
    /**
240
     * @inheritdoc
241
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
242
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
243
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
244
     */
245
    public function performSpecialActions(Client $client, WritableContentInterface $originalRequest): bool
246
    {
247
        if ($this->message->getQoSLevel() === 0) {
248
            $this->logger->debug('No response needed', ['qosLevel', $this->message->getQoSLevel()]);
249
        } else {
250
            $client->setBlocking(true);
251
            if ($this->message->getQoSLevel() === 1) {
252
                $this->logger->debug('Responding with PubAck', ['qosLevel' => $this->message->getQoSLevel()]);
253
                $client->sendData($this->composePubAckAnswer());
254
            } elseif ($this->message->getQoSLevel() === 2) {
255
                $this->logger->debug('Responding with PubRec', ['qosLevel' => $this->message->getQoSLevel()]);
256
                $client->sendData(new PubRec($this->logger));
257
            }
258
            $client->setBlocking(false);
259
        }
260
261
        return true;
262
    }
263
264
    /**
265
     * Composes a PubAck answer with the same packetIdentifier as what we received
266
     * @return PubAck
267
     */
268
    private function composePubAckAnswer(): PubAck
269
    {
270
        $pubAck = new PubAck($this->logger);
271
        $pubAck->packetIdentifier = $this->packetIdentifier;
272
        return $pubAck;
273
    }
274
}
275