1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
declare(strict_types=1); |
4
|
|
|
|
5
|
|
|
namespace unreal4u\MQTT\Protocol; |
6
|
|
|
|
7
|
|
|
use unreal4u\MQTT\Application\EmptyReadableResponse; |
8
|
|
|
use unreal4u\MQTT\Application\Message; |
9
|
|
|
use unreal4u\MQTT\Application\SimplePayload; |
10
|
|
|
use unreal4u\MQTT\Client; |
11
|
|
|
use unreal4u\MQTT\Internals\ProtocolBase; |
12
|
|
|
use unreal4u\MQTT\Internals\ReadableContent; |
13
|
|
|
use unreal4u\MQTT\Internals\ReadableContentInterface; |
14
|
|
|
use unreal4u\MQTT\Internals\WritableContent; |
15
|
|
|
use unreal4u\MQTT\Internals\WritableContentInterface; |
16
|
|
|
use unreal4u\MQTT\Utilities; |
17
|
|
|
|
18
|
|
|
final class Publish extends ProtocolBase implements ReadableContentInterface, WritableContentInterface |
19
|
|
|
{ |
20
|
|
|
use ReadableContent; |
21
|
|
|
use WritableContent; |
22
|
|
|
|
23
|
|
|
const CONTROL_PACKET_VALUE = 3; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* Contains the message to be sent |
27
|
|
|
* @var Message |
28
|
|
|
*/ |
29
|
|
|
private $message; |
30
|
|
|
|
31
|
|
|
public $packetIdentifier = 0; |
32
|
|
|
|
33
|
|
|
public $isRedelivery = false; |
34
|
|
|
|
35
|
|
|
public function createVariableHeader(): string |
36
|
|
|
{ |
37
|
|
|
if ($this->message === null) { |
38
|
|
|
throw new \InvalidArgumentException('You must at least provide a message object with a topic name'); |
39
|
|
|
} |
40
|
|
|
|
41
|
|
|
$bitString = $this->createUTF8String($this->message->getTopicName()); |
42
|
|
|
// Reset the special flags should the object be reused with another message |
43
|
|
|
$this->specialFlags = 0; |
44
|
|
|
|
45
|
|
|
if ($this->isRedelivery) { |
46
|
|
|
$this->logger->debug('Activating redelivery bit'); |
47
|
|
|
// DUP flag: if the message is a re-delivery, mark it as such |
48
|
|
|
$this->specialFlags |= 8; |
49
|
|
|
} |
50
|
|
|
|
51
|
|
|
// Check QoS level and perform the corresponding actions |
52
|
|
|
switch ($this->message->getQoSLevel()) { |
53
|
|
View Code Duplication |
case 1: |
|
|
|
|
54
|
|
|
$this->specialFlags |= 2; |
55
|
|
|
$this->logger->debug('Activating QoS level 1 bit', ['specialFlags' => $this->specialFlags]); |
56
|
|
|
$this->packetIdentifier++; |
57
|
|
|
$bitString .= Utilities::convertNumberToBinaryString($this->packetIdentifier); |
58
|
|
|
break; |
59
|
|
View Code Duplication |
case 2: |
|
|
|
|
60
|
|
|
$this->specialFlags |= 4; |
61
|
|
|
$this->logger->debug('Activating QoS level 2 bit', ['specialFlags' => $this->specialFlags]); |
62
|
|
|
$this->packetIdentifier++; |
63
|
|
|
$bitString .= Utilities::convertNumberToBinaryString($this->packetIdentifier); |
64
|
|
|
break; |
65
|
|
|
default: |
66
|
|
|
break; |
67
|
|
|
} |
68
|
|
|
|
69
|
|
|
if ($this->message->mustRetain()) { |
70
|
|
|
// RETAIN flag: should the server retain the message? |
71
|
|
|
$this->specialFlags |= 1; |
72
|
|
|
$this->logger->debug('Activating retain flag', ['specialFlags' => $this->specialFlags]); |
73
|
|
|
} |
74
|
|
|
|
75
|
|
|
$this->logger->info('Variable header created', ['specialFlags' => $this->specialFlags]); |
76
|
|
|
|
77
|
|
|
return $bitString; |
78
|
|
|
} |
79
|
|
|
|
80
|
|
|
public function createPayload(): string |
81
|
|
|
{ |
82
|
|
|
if (!$this->message->validateMessage()) { |
83
|
|
|
throw new \InvalidArgumentException('Invalid message'); |
84
|
|
|
} |
85
|
|
|
|
86
|
|
|
return $this->message->getPayload(); |
87
|
|
|
} |
88
|
|
|
|
89
|
|
|
/** |
90
|
|
|
* QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true |
91
|
|
|
* @return bool |
92
|
|
|
*/ |
93
|
|
|
public function shouldExpectAnswer(): bool |
94
|
|
|
{ |
95
|
|
|
return !($this->message->getQoSLevel() === 0); |
96
|
|
|
} |
97
|
|
|
|
98
|
|
|
public function expectAnswer(string $data): ReadableContentInterface |
99
|
|
|
{ |
100
|
|
|
if ($this->shouldExpectAnswer() === false) { |
101
|
|
|
return new EmptyReadableResponse($this->logger); |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
$pubAck = new PubAck($this->logger); |
105
|
|
|
$pubAck->populate($data); |
106
|
|
|
return $pubAck; |
107
|
|
|
} |
108
|
|
|
|
109
|
|
|
/** |
110
|
|
|
* Sets the to be sent message |
111
|
|
|
* |
112
|
|
|
* @param Message $message |
113
|
|
|
* @return WritableContentInterface |
114
|
|
|
*/ |
115
|
|
|
public function setMessage(Message $message): WritableContentInterface |
116
|
|
|
{ |
117
|
|
|
$this->message = $message; |
118
|
|
|
return $this; |
119
|
|
|
} |
120
|
|
|
|
121
|
|
|
/** |
122
|
|
|
* Gets the set message |
123
|
|
|
* |
124
|
|
|
* @return Message |
125
|
|
|
*/ |
126
|
|
|
public function getMessage(): Message |
127
|
|
|
{ |
128
|
|
|
return $this->message; |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
/** |
132
|
|
|
* Will perform sanity checks and fill in the Readable object with data |
133
|
|
|
* @return ReadableContentInterface |
134
|
|
|
*/ |
135
|
|
|
public function fillObject(): ReadableContentInterface |
136
|
|
|
{ |
137
|
|
|
$topicSize = \ord($this->rawMQTTHeaders{3}); |
138
|
|
|
|
139
|
|
|
$this->message = new Message(); |
140
|
|
|
$this->message->setPayload(new SimplePayload(substr($this->rawMQTTHeaders, 4 + $topicSize))); |
|
|
|
|
141
|
|
|
$this->message->setTopicName(substr($this->rawMQTTHeaders, 4, $topicSize)); |
|
|
|
|
142
|
|
|
|
143
|
|
|
return $this; |
144
|
|
|
} |
145
|
|
|
|
146
|
|
|
/** |
147
|
|
|
* @inheritdoc |
148
|
|
|
*/ |
149
|
|
|
public function performSpecialActions(Client $client, WritableContentInterface $originalRequest): bool |
150
|
|
|
{ |
151
|
|
|
if ($this->message->getQoSLevel() === 0) { |
152
|
|
|
$this->logger->debug('No response needed', ['qosLevel', $this->message->getQoSLevel()]); |
153
|
|
|
} else { |
154
|
|
|
$client->setBlocking(true); |
155
|
|
|
if ($this->message->getQoSLevel() === 1) { |
156
|
|
|
$this->logger->debug('Responding with PubAck', ['qosLevel' => $this->message->getQoSLevel()]); |
157
|
|
|
$client->sendData($this->composePubAckAnswer()); |
158
|
|
|
} elseif ($this->message->getQoSLevel() === 2) { |
159
|
|
|
$this->logger->debug('Responding with PubRec', ['qosLevel' => $this->message->getQoSLevel()]); |
160
|
|
|
$client->sendData(new PubRec($this->logger)); |
161
|
|
|
} |
162
|
|
|
$client->setBlocking(false); |
163
|
|
|
} |
164
|
|
|
|
165
|
|
|
return true; |
166
|
|
|
} |
167
|
|
|
|
168
|
|
|
/** |
169
|
|
|
* Composes a PubAck answer with the same packetIdentifier as what we received |
170
|
|
|
* @return PubAck |
171
|
|
|
*/ |
172
|
|
|
private function composePubAckAnswer(): PubAck |
173
|
|
|
{ |
174
|
|
|
$pubAck = new PubAck($this->logger); |
175
|
|
|
$pubAck->packetIdentifier = $this->packetIdentifier; |
176
|
|
|
return $pubAck; |
177
|
|
|
} |
178
|
|
|
} |
179
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.