1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
declare(strict_types=1); |
4
|
|
|
|
5
|
|
|
namespace unreal4u\MQTT; |
6
|
|
|
|
7
|
|
|
use unreal4u\MQTT\Application\EmptyWritableResponse; |
8
|
|
|
use unreal4u\MQTT\Exceptions\NotConnected; |
9
|
|
|
use unreal4u\MQTT\Exceptions\ServerClosedConnection; |
10
|
|
|
use unreal4u\MQTT\Internals\ClientInterface; |
11
|
|
|
use unreal4u\MQTT\Internals\ProtocolBase; |
12
|
|
|
use unreal4u\MQTT\Internals\ReadableContentInterface; |
13
|
|
|
use unreal4u\MQTT\Internals\WritableContentInterface; |
14
|
|
|
use unreal4u\MQTT\Protocol\Connect; |
15
|
|
|
use unreal4u\MQTT\Protocol\Disconnect; |
16
|
|
|
|
17
|
|
|
/** |
18
|
|
|
* Class Client |
19
|
|
|
* @package unreal4u\MQTT |
20
|
|
|
*/ |
21
|
|
|
final class Client extends ProtocolBase implements ClientInterface |
22
|
|
|
{ |
23
|
|
|
/** |
24
|
|
|
* Where all the magic happens |
25
|
|
|
* @var Resource |
26
|
|
|
*/ |
27
|
|
|
private $socket; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* Fast way to know whether we are connected or not |
31
|
|
|
* @var bool |
32
|
|
|
*/ |
33
|
|
|
private $isConnected = false; |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* Fast way to know whether we are currently in locked mode or not |
37
|
|
|
* @var bool |
38
|
|
|
*/ |
39
|
|
|
private $isCurrentlyLocked = false; |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* Annotates the last time there was known to be communication with the MQTT server |
43
|
|
|
* @var \DateTimeImmutable |
44
|
|
|
*/ |
45
|
|
|
private $lastCommunication; |
46
|
|
|
|
47
|
|
|
/** |
48
|
|
|
* Internal holder of connection parameters |
49
|
|
|
* @var Connect\Parameters |
50
|
|
|
*/ |
51
|
|
|
private $connectionParameters; |
52
|
|
|
|
53
|
|
|
/** |
54
|
|
|
* Temporary holder for async requests so that they can be handled synchronously |
55
|
|
|
* @var WritableContentInterface[] |
56
|
|
|
*/ |
57
|
|
|
private $objectStack = []; |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* @inheritdoc |
61
|
|
|
* @throws \LogicException |
62
|
|
|
* @throws \unreal4u\MQTT\Exceptions\NonMatchingPacketIdentifiers |
63
|
|
|
* @throws \unreal4u\MQTT\Exceptions\NotConnected |
64
|
|
|
* @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined |
65
|
|
|
* @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection |
66
|
|
|
*/ |
67
|
|
|
public function __destruct() |
68
|
|
|
{ |
69
|
|
|
if ($this->isConnected() === true) { |
70
|
|
|
$this->logger->info('Currently connected to broker, disconnecting from it'); |
71
|
|
|
|
72
|
|
|
$this->processObject(new Disconnect($this->logger)); |
73
|
|
|
} |
74
|
|
|
} |
75
|
|
|
|
76
|
|
|
/** |
77
|
|
|
* @inheritdoc |
78
|
|
|
*/ |
79
|
|
|
public function shutdownConnection(): bool |
80
|
|
|
{ |
81
|
|
|
return stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR); |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* @inheritdoc |
86
|
|
|
*/ |
87
|
|
|
public function readBrokerData(int $bytes): string |
88
|
|
|
{ |
89
|
|
|
$this->logger->debug('Reading bytes from socket', [ |
90
|
|
|
'numberOfBytes' => $bytes, |
91
|
|
|
'isLocked' => $this->isCurrentlyLocked, |
92
|
|
|
]); |
93
|
|
|
return fread($this->socket, $bytes); |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
/** |
97
|
|
|
* @inheritdoc |
98
|
|
|
*/ |
99
|
|
|
public function readBrokerHeader(): string |
100
|
|
|
{ |
101
|
|
|
$this->logger->debug('Reading header from response'); |
102
|
|
|
return $this->readBrokerData(4); |
103
|
|
|
} |
104
|
|
|
|
105
|
|
|
/** |
106
|
|
|
* @inheritdoc |
107
|
|
|
* @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection |
108
|
|
|
* @throws \unreal4u\MQTT\Exceptions\NotConnected |
109
|
|
|
*/ |
110
|
|
|
public function sendBrokerData(WritableContentInterface $object): string |
111
|
|
|
{ |
112
|
|
|
if ($this->socket === null) { |
113
|
|
|
$this->logger->alert('Not connected before sending data'); |
114
|
|
|
throw new NotConnected('Please connect before performing any other request'); |
115
|
|
|
} |
116
|
|
|
|
117
|
|
|
$writableString = $object->createSendableMessage(); |
118
|
|
|
$sizeOfString = \strlen($writableString); |
119
|
|
|
$writtenBytes = fwrite($this->socket, $writableString, $sizeOfString); |
120
|
|
|
// $this->logger->debug('Sent string', ['binaryString' => str2bin($writableString)]); // Handy for debugging |
121
|
|
|
if ($writtenBytes !== $sizeOfString) { |
122
|
|
|
$this->logger->error('Written bytes do NOT correspond with size of string!', [ |
123
|
|
|
'writtenBytes' => $writtenBytes, |
124
|
|
|
'sizeOfString' => $sizeOfString, |
125
|
|
|
]); |
126
|
|
|
|
127
|
|
|
throw new ServerClosedConnection('The server may have disconnected the current client'); |
128
|
|
|
} |
129
|
|
|
|
130
|
|
|
$this->logger->debug('Sent data to socket', ['writtenBytes' => $writtenBytes, 'sizeOfString' => $sizeOfString]); |
131
|
|
|
return $this->checkAndReturnAnswer($object); |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
/** |
135
|
|
|
* Checks on the writable object whether we should wait for an answer and either wait or return an empty string |
136
|
|
|
* |
137
|
|
|
* @param WritableContentInterface $object |
138
|
|
|
* @return string |
139
|
|
|
*/ |
140
|
|
|
private function checkAndReturnAnswer(WritableContentInterface $object): string |
141
|
|
|
{ |
142
|
|
|
$returnValue = ''; |
143
|
|
|
if ($object->shouldExpectAnswer() === true) { |
144
|
|
|
$this->enableSynchronousTransfer(true); |
145
|
|
|
$returnValue = $this->readBrokerHeader(); |
146
|
|
|
$this->enableSynchronousTransfer(false); |
147
|
|
|
} |
148
|
|
|
|
149
|
|
|
return $returnValue; |
150
|
|
|
} |
151
|
|
|
|
152
|
|
|
/** |
153
|
|
|
* Checks for socket error connections, will throw an exception if any is found |
154
|
|
|
* |
155
|
|
|
* @param int $errorCode |
156
|
|
|
* @param string $errorDescription |
157
|
|
|
* @return Client |
158
|
|
|
* @throws \unreal4u\MQTT\Exceptions\NotConnected |
159
|
|
|
*/ |
160
|
|
|
private function checkForConnectionErrors(int $errorCode, string $errorDescription): self |
161
|
|
|
{ |
162
|
|
|
if ($errorCode !== 0) { |
163
|
|
|
$this->logger->critical('Could not connect to broker', [ |
164
|
|
|
'errorCode' => $errorCode, |
165
|
|
|
'errorDescription' => $errorDescription, |
166
|
|
|
]); |
167
|
|
|
|
168
|
|
|
throw new NotConnected('Could not connect to broker: ' . $errorDescription, $errorCode); |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
return $this; |
172
|
|
|
} |
173
|
|
|
|
174
|
|
|
/** |
175
|
|
|
* Special handling of the connect part: create the socket |
176
|
|
|
* |
177
|
|
|
* @param Connect $connection |
178
|
|
|
* @return bool |
179
|
|
|
* @throws \unreal4u\MQTT\Exceptions\NotConnected |
180
|
|
|
* @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined |
181
|
|
|
*/ |
182
|
|
|
private function generateSocketConnection(Connect $connection): bool |
183
|
|
|
{ |
184
|
|
|
$this->logger->debug('Creating socket connection'); |
185
|
|
|
$this->connectionParameters = $connection->getConnectionParameters(); |
186
|
|
|
$this->socket = stream_socket_client( |
|
|
|
|
187
|
|
|
$this->connectionParameters->getConnectionUrl(), |
188
|
|
|
$errorCode, |
189
|
|
|
$errorDescription, |
190
|
|
|
60, |
191
|
|
|
STREAM_CLIENT_CONNECT |
192
|
|
|
); |
193
|
|
|
|
194
|
|
|
$this->checkForConnectionErrors($errorCode, $errorDescription); |
195
|
|
|
|
196
|
|
|
stream_set_timeout($this->socket, (int)floor($this->connectionParameters->getKeepAlivePeriod() * 1.5)); |
|
|
|
|
197
|
|
|
|
198
|
|
|
$this->logger->debug('Created socket connection successfully, continuing', stream_get_meta_data($this->socket)); |
|
|
|
|
199
|
|
|
return true; |
200
|
|
|
} |
201
|
|
|
|
202
|
|
|
/** |
203
|
|
|
* @inheritdoc |
204
|
|
|
*/ |
205
|
|
|
public function enableSynchronousTransfer(bool $newStatus): ClientInterface |
206
|
|
|
{ |
207
|
|
|
$this->logger->debug('Setting new blocking status', ['newStatus' => $newStatus]); |
208
|
|
|
stream_set_blocking($this->socket, $newStatus); |
|
|
|
|
209
|
|
|
$this->isCurrentlyLocked = $newStatus; |
210
|
|
|
return $this; |
211
|
|
|
} |
212
|
|
|
|
213
|
|
|
/** |
214
|
|
|
* Stuff that has to happen before we actually begin sending data through our socket |
215
|
|
|
* |
216
|
|
|
* @param WritableContentInterface $object |
217
|
|
|
* @return Client |
218
|
|
|
* @throws \unreal4u\MQTT\Exceptions\NotConnected |
219
|
|
|
* @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined |
220
|
|
|
*/ |
221
|
|
|
private function preSocketCommunication(WritableContentInterface $object): self |
222
|
|
|
{ |
223
|
|
|
$this->objectStack[$object::getControlPacketValue()] = $object; |
224
|
|
|
|
225
|
|
|
if ($object instanceof Connect) { |
226
|
|
|
$this->generateSocketConnection($object); |
227
|
|
|
} |
228
|
|
|
|
229
|
|
|
return $this; |
230
|
|
|
} |
231
|
|
|
|
232
|
|
|
/** |
233
|
|
|
* Checks in the object stack whether there is some method that might issue the current ReadableContent |
234
|
|
|
* |
235
|
|
|
* @param ReadableContentInterface $readableContent |
236
|
|
|
* @return WritableContentInterface |
237
|
|
|
* @throws \LogicException |
238
|
|
|
*/ |
239
|
|
|
private function postSocketCommunication(ReadableContentInterface $readableContent): WritableContentInterface |
240
|
|
|
{ |
241
|
|
|
$originPacket = null; |
242
|
|
|
|
243
|
|
|
$originPacketIdentifier = $readableContent->getOriginControlPacket(); |
244
|
|
|
if (array_key_exists($originPacketIdentifier, $this->objectStack)) { |
245
|
|
|
$this->logger->debug('Origin packet found, returning it', ['originKey' => $originPacketIdentifier]); |
246
|
|
|
$originPacket = $this->objectStack[$originPacketIdentifier]; |
247
|
|
|
unset($this->objectStack[$originPacketIdentifier]); |
248
|
|
|
} elseif ($originPacketIdentifier === 0) { |
249
|
|
|
$originPacket = new EmptyWritableResponse($this->logger); |
250
|
|
|
} else { |
251
|
|
|
$this->logger->warning('No origin packet found!', [ |
252
|
|
|
'originKey' => $originPacketIdentifier, |
253
|
|
|
'stack' => array_keys($this->objectStack), |
254
|
|
|
]); |
255
|
|
|
#throw new \LogicException('No origin instance could be found in the stack, please check'); |
256
|
|
|
$originPacket = new EmptyWritableResponse($this->logger); |
257
|
|
|
} |
258
|
|
|
|
259
|
|
|
return $originPacket; |
260
|
|
|
} |
261
|
|
|
|
262
|
|
|
/** |
263
|
|
|
* @inheritdoc |
264
|
|
|
* @throws \LogicException |
265
|
|
|
* @throws \unreal4u\MQTT\Exceptions\NonMatchingPacketIdentifiers |
266
|
|
|
* @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection |
267
|
|
|
* @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined |
268
|
|
|
* @throws \unreal4u\MQTT\Exceptions\NotConnected |
269
|
|
|
*/ |
270
|
|
|
public function processObject(WritableContentInterface $object): ReadableContentInterface |
271
|
|
|
{ |
272
|
|
|
$currentObject = \get_class($object); |
273
|
|
|
$this->logger->debug('Validating object', ['object' => $currentObject]); |
274
|
|
|
|
275
|
|
|
$this->preSocketCommunication($object); |
276
|
|
|
|
277
|
|
|
$this->logger->info('About to send data', ['object' => $currentObject]); |
278
|
|
|
$readableContent = $object->expectAnswer($this->sendBrokerData($object), $this); |
279
|
|
|
/* |
280
|
|
|
* Some objects must perform certain actions on the connection, for example: |
281
|
|
|
* - ConnAck must set the connected bit |
282
|
|
|
* - PingResp must reset the internal last-communication datetime |
283
|
|
|
*/ |
284
|
|
|
$this->logger->debug('Checking stack and performing special operations', [ |
285
|
|
|
'originObject' => $currentObject, |
286
|
|
|
'responseObject' => \get_class($readableContent), |
287
|
|
|
]); |
288
|
|
|
|
289
|
|
|
$readableContent->performSpecialActions($this, $this->postSocketCommunication($readableContent)); |
290
|
|
|
|
291
|
|
|
return $readableContent; |
292
|
|
|
} |
293
|
|
|
|
294
|
|
|
/** |
295
|
|
|
* @inheritdoc |
296
|
|
|
*/ |
297
|
|
|
public function isItPingTime(): bool |
298
|
|
|
{ |
299
|
|
|
$secondsDifference = (new \DateTime('now'))->getTimestamp() - $this->lastCommunication->getTimestamp(); |
300
|
|
|
$this->logger->debug('Checking time difference', [ |
301
|
|
|
'secondsDifference' => $secondsDifference, |
302
|
|
|
'keepAlivePeriod' => $this->connectionParameters->getKeepAlivePeriod(), |
303
|
|
|
]); |
304
|
|
|
|
305
|
|
|
return |
306
|
|
|
$this->isConnected() && |
307
|
|
|
$this->connectionParameters->getKeepAlivePeriod() > 0 && |
308
|
|
|
$secondsDifference >= $this->connectionParameters->getKeepAlivePeriod()# && |
309
|
|
|
#!array_key_exists(PingReq::CONTROL_PACKET_VALUE, $this->objectStack) |
310
|
|
|
; |
311
|
|
|
} |
312
|
|
|
|
313
|
|
|
/** |
314
|
|
|
* @inheritdoc |
315
|
|
|
*/ |
316
|
|
|
public function updateLastCommunication(): ClientInterface |
317
|
|
|
{ |
318
|
|
|
$lastCommunication = null; |
319
|
|
|
if ($this->lastCommunication !== null) { |
320
|
|
|
$lastCommunication = $this->lastCommunication->format('Y-m-d H:i:s.u'); |
321
|
|
|
} |
322
|
|
|
// "now" does not support microseconds, so create the timestamp with a format that does |
323
|
|
|
$this->lastCommunication = \DateTimeImmutable::createFromFormat('U.u', sprintf('%.6F', microtime(true))); |
|
|
|
|
324
|
|
|
$this->logger->debug('Updating internal last communication timestamp', [ |
325
|
|
|
'previousValue' => $lastCommunication, |
326
|
|
|
'currentValue' => $this->lastCommunication->format('Y-m-d H:i:s.u'), |
327
|
|
|
]); |
328
|
|
|
return $this; |
329
|
|
|
} |
330
|
|
|
|
331
|
|
|
/** |
332
|
|
|
* @inheritdoc |
333
|
|
|
*/ |
334
|
|
|
public function setConnected(bool $isConnected): ClientInterface |
335
|
|
|
{ |
336
|
|
|
$this->logger->debug('Setting internal connected property', ['connected' => $isConnected]); |
337
|
|
|
$this->isConnected = $isConnected; |
338
|
|
|
if ($this->isConnected() === false) { |
339
|
|
|
$this->socket = null; |
340
|
|
|
} |
341
|
|
|
|
342
|
|
|
return $this; |
343
|
|
|
} |
344
|
|
|
|
345
|
|
|
/** |
346
|
|
|
* @inheritdoc |
347
|
|
|
*/ |
348
|
|
|
public function isConnected(): bool |
349
|
|
|
{ |
350
|
|
|
return $this->isConnected; |
351
|
|
|
} |
352
|
|
|
} |
353
|
|
|
|
Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.
For example, imagine you have a variable
$accountId
that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to theid
property of an instance of theAccount
class. This class holds a proper account, so the id value must no longer be false.Either this assignment is in error or a type check should be added for that assignment.