Completed
Push — master ( 368962...ec7bc7 )
by Camilo
02:18
created

Subscribe::getTopics()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 2
eloc 2
nc 2
nop 0
crap 6
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\Application\Topic;
9
use unreal4u\MQTT\Internals\ClientInterface;
10
use unreal4u\MQTT\Internals\EventManager;
11
use unreal4u\MQTT\Internals\ProtocolBase;
12
use unreal4u\MQTT\Internals\ReadableContentInterface;
13
use unreal4u\MQTT\Internals\WritableContent;
14
use unreal4u\MQTT\Internals\WritableContentInterface;
15
use unreal4u\MQTT\Utilities;
16
17
final class Subscribe extends ProtocolBase implements WritableContentInterface
18
{
19
    use WritableContent;
20
21
    const CONTROL_PACKET_VALUE = 8;
22
23
    private $packetIdentifier = 0;
24
25
    /**
26
     * An array of topics on which to subscribe to
27
     * @var Topic[]
28
     */
29
    private $topics = [];
30
31
    /**
32
     * Indicates whether to continue the loop or break it at any point, cleanly without disconnecting from the broker
33
     * @var bool
34
     */
35
    private $shouldLoop = true;
36
37
    /**
38
     * @return string
39
     * @throws \OutOfRangeException
40
     * @throws \Exception
41
     */
42
    public function createVariableHeader(): string
43
    {
44
        // Subscribe must always send a 2 flag
45
        $this->specialFlags = 2;
46
47
        // Assign a packet identifier automatically if none has been assigned yet
48
        if ($this->packetIdentifier === 0) {
49
            $this->setPacketIdentifier(random_int(0, 65535));
50
        }
51
52
        return Utilities::convertNumberToBinaryString($this->packetIdentifier);
53
    }
54
55
    public function createPayload(): string
56
    {
57
        $output = '';
58
        foreach ($this->getTopics() as $topic) {
59
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
60
            $output .= $this->createUTF8String($topic->getTopicName()) . \chr($topic->getTopicQoSLevel());
61
        }
62
        return $output;
63
    }
64
65
    /**
66
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
67
     *
68
     * This can however not be in the same order, as we may be able to receive PUBLISH packets before getting a SUBACK
69
     * back
70
     *
71
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
72
     * @return bool
73
     */
74
    public function shouldExpectAnswer(): bool
75
    {
76
        return true;
77
    }
78
79
    /**
80
     * SUBSCRIBE Control Packets MUST contain a non-zero 16-bit Packet Identifier
81
     *
82
     * @param int $packetIdentifier
83
     * @return Subscribe
84
     * @throws \OutOfRangeException
85
     */
86
    public function setPacketIdentifier(int $packetIdentifier): Subscribe
87
    {
88
        if ($packetIdentifier > 65535 || $packetIdentifier < 1) {
89
            throw new \OutOfRangeException('Packet identifier must fit within 2 bytes');
90
        }
91
92
        $this->packetIdentifier = $packetIdentifier;
93
        $this->logger->debug('Setting packet identifier', ['current' => $this->packetIdentifier]);
94
95
        return $this;
96
    }
97
98
    public function getPacketIdentifier(): int
99
    {
100
        return $this->packetIdentifier;
101
    }
102
103
    /**
104
     * Performs a check on the socket connection and returns either the contents or an empty object
105
     *
106
     * @param ClientInterface $client
107
     * @return ReadableContentInterface
108
     * @throws \DomainException
109
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
110
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
111
     */
112
    public function checkForEvent(ClientInterface $client): ReadableContentInterface
113
    {
114
        $this->checkPingTime($client);
115
        $publishPacketControlField = $client->readSocketData(1);
116
        $eventManager = new EventManager($this->logger);
117
118
        if ((\ord($publishPacketControlField) & 255) > 0) {
119
            $this->logger->debug('Event received', ['ordValue' => \ord($publishPacketControlField) & 255]);
120
            return $eventManager->analyzeHeaders($publishPacketControlField, $client);
121
        }
122
123
        $this->logger->debug('No valid publish packet control field found, returning empty response');
124
        return new EmptyReadableResponse($this->logger);
125
    }
126
127
    /**
128
     * Loop and yields different type of results back whenever they are available
129
     *
130
     * @param ClientInterface $client
131
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
132
     * @param callable|null $hookBeforeLoop
133
     * @return \Generator
134
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
135
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
136
     * @throws \DomainException
137
     */
138
    public function loop(
139
        ClientInterface $client,
140
        int $idleMicroseconds = 100000,
141
        callable $hookBeforeLoop = null
142
    ): \Generator {
143
        $this->shouldLoop = true;
144
        // First of all: subscribe
145
        $this->logger->debug('Beginning loop', ['idleMicroseconds' => $idleMicroseconds]);
146
        $readableContent = $client->sendData($this);
147
148
        // Allow the user to do certain stuff before looping, for example: an Unsubscribe
149
        if (\is_callable($hookBeforeLoop)) {
150
            $this->logger->notice('Callable detected, executing', ['userFunctionName' => $hookBeforeLoop]);
151
            $hookBeforeLoop($this->logger);
152
        }
153
154
        while ($this->shouldLoop === true) {
0 ignored issues
show
introduced by
The condition $this->shouldLoop === true can never be false.
Loading history...
155
            $this->logger->debug('++Loop++');
156
            if ($readableContent instanceof Publish) {
157
                // Only if we receive a Publish event from the broker, yield the contents
158
                yield $readableContent->getMessage();
159
            } else {
160
                // Only wait for a certain amount of time if there was nothing in the queue
161
                $this->logger->debug('Got an incoming object, but not a message, disregarding', [
162
                    'class' => \get_class($readableContent),
163
                ]);
164
                usleep($idleMicroseconds);
165
            }
166
167
            $readableContent = $this->checkForEvent($client);
168
        }
169
    }
170
171
    /**
172
     * Call this function to break out of the loop cleanly
173
     *
174
     * There is no way to know on which topics we are still subscribed on. This function lets us exit the above loop
175
     * cleanly without the need to disconnect from the broker.
176
     *
177
     * @return Subscribe
178
     */
179
    public function breakLoop(): self
180
    {
181
        $this->shouldLoop = false;
182
        return $this;
183
    }
184
185
    /**
186
     * A subscription is based on filters, this function allows us to pass on filters
187
     *
188
     * @param Topic[] $topics
189
     * @return Subscribe
190
     */
191
    public function addTopics(Topic ...$topics): Subscribe
192
    {
193
        $this->topics = $topics;
194
        $this->logger->debug('Topics added', ['totalTopics', $this->getNumberOfTopics()]);
195
196
        return $this;
197
    }
198
199
    /**
200
     * Returns the current number of topics
201
     *
202
     * @return int
203
     */
204
    public function getNumberOfTopics(): int
205
    {
206
        return count($this->topics);
207
    }
208
209
    /**
210
     * Returns the topics in the order they were inserted / requested
211
     *
212
     * The order is important because SUBACK will return the status code for each topic in this order, without
213
     * explicitly identifying which topic is which
214
     *
215
     * @return \Generator|Topic[]
216
     */
217
    public function getTopics(): \Generator
218
    {
219
        foreach ($this->topics as $topic) {
220
            yield $topic;
221
        }
222
    }
223
224
    /**
225
     * @param ClientInterface $client
226
     * @return bool
227
     */
228
    protected function checkPingTime(ClientInterface $client): bool
229
    {
230
        if ($client->isItPingTime()) {
231
            $this->logger->info('Pinging is needed, sending PingReq');
232
            $client->sendData(new PingReq($this->logger));
233
        }
234
235
        return true;
236
    }
237
}
238