Completed
Push — master ( bb30b8...368962 )
by Camilo
02:11
created

Subscribe::breakLoop()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\Application\Topic;
9
use unreal4u\MQTT\Internals\ClientInterface;
10
use unreal4u\MQTT\Internals\EventManager;
11
use unreal4u\MQTT\Internals\ProtocolBase;
12
use unreal4u\MQTT\Internals\ReadableContentInterface;
13
use unreal4u\MQTT\Internals\WritableContent;
14
use unreal4u\MQTT\Internals\WritableContentInterface;
15
use unreal4u\MQTT\Utilities;
16
17
final class Subscribe extends ProtocolBase implements WritableContentInterface
18
{
19
    use WritableContent;
20
21
    const CONTROL_PACKET_VALUE = 8;
22
23
    private $packetIdentifier = 0;
24
25
    /**
26
     * An array of topics on which to subscribe to
27
     * @var Topic[]
28
     */
29
    private $topics = [];
30
31
    /**
32
     * Indicates whether to continue the loop or break it at any point, cleanly without disconnecting from the broker
33
     * @var bool
34
     */
35
    private $shouldLoop = true;
36
37
    /**
38
     * @return string
39
     * @throws \OutOfRangeException
40
     * @throws \Exception
41
     */
42
    public function createVariableHeader(): string
43
    {
44
        // Subscribe must always send a 2 flag
45
        $this->specialFlags = 2;
46
47
        // Assign a packet identifier automatically if none has been assigned yet
48
        if ($this->packetIdentifier === 0) {
49
            $this->setPacketIdentifier(random_int(0, 65535));
50
        }
51
52
        return Utilities::convertNumberToBinaryString($this->packetIdentifier);
53
    }
54
55
    public function createPayload(): string
56
    {
57
        $output = '';
58
        foreach ($this->topics as $topic) {
59
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
60
            $output .= $this->createUTF8String($topic->getTopicName()) . \chr($topic->getTopicQoSLevel());
61
        }
62
        return $output;
63
    }
64
65
    /**
66
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
67
     *
68
     * This can however not be in the same order, as we may be able to receive PUBLISH packets before getting a SUBACK
69
     * back
70
     *
71
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
72
     * @return bool
73
     */
74
    public function shouldExpectAnswer(): bool
75
    {
76
        return true;
77
    }
78
79
    /**
80
     * SUBSCRIBE Control Packets MUST contain a non-zero 16-bit Packet Identifier
81
     *
82
     * @param int $packetIdentifier
83
     * @return Subscribe
84
     * @throws \OutOfRangeException
85
     */
86
    public function setPacketIdentifier(int $packetIdentifier): Subscribe
87
    {
88
        if ($packetIdentifier > 65535 || $packetIdentifier < 1) {
89
            throw new \OutOfRangeException('Packet identifier must fit within 2 bytes');
90
        }
91
92
        $this->packetIdentifier = $packetIdentifier;
93
        $this->logger->debug('Setting packet identifier', ['current' => $this->packetIdentifier]);
94
95
        return $this;
96
    }
97
98
    public function getPacketIdentifier(): int
99
    {
100
        return $this->packetIdentifier;
101
    }
102
103
    /**
104
     * Performs a check on the socket connection and returns either the contents or an empty object
105
     *
106
     * @param ClientInterface $client
107
     * @return ReadableContentInterface
108
     * @throws \DomainException
109
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
110
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
111
     */
112
    public function checkForEvent(ClientInterface $client): ReadableContentInterface
113
    {
114
        $this->checkPingTime($client);
115
        $publishPacketControlField = $client->readSocketData(1);
116
        $eventManager = new EventManager($this->logger);
117
118
        if ((\ord($publishPacketControlField) & 255) > 0) {
119
            $this->logger->debug('Event received', ['ordValue' => \ord($publishPacketControlField) & 255]);
120
            return $eventManager->analyzeHeaders($publishPacketControlField, $client);
121
        }
122
123
        $this->logger->debug('No valid publish packet control field found, returning empty response');
124
        return new EmptyReadableResponse($this->logger);
125
    }
126
127
    /**
128
     * Loop and yields different type of results back whenever they are available
129
     *
130
     * @param ClientInterface $client
131
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
132
     * @param callable|null $hookBeforeLoop
133
     * @return \Generator
134
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
135
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
136
     * @throws \DomainException
137
     */
138
    public function loop(
139
        ClientInterface $client,
140
        int $idleMicroseconds = 100000,
141
        callable $hookBeforeLoop = null
142
    ): \Generator
143
    {
144
        $this->shouldLoop = true;
145
        // First of all: subscribe
146
        $this->logger->debug('Beginning loop', ['idleMicroseconds' => $idleMicroseconds]);
147
        $readableContent = $client->sendData($this);
148
149
        // Allow the user to do certain stuff before looping, for example: an Unsubscribe
150
        if (\is_callable($hookBeforeLoop)) {
151
            // "Flushing" the socket seems to work in order to call up an Unsubscribe at this stage
152
            $this->logger->notice('Callable detected, "flushing" socket', ['userFunctionName' => $hookBeforeLoop]);
153
            $client->readSocketData(4);
154
155
            $hookBeforeLoop($this->logger);
156
        }
157
158
        while ($this->shouldLoop === true) {
0 ignored issues
show
introduced by
The condition $this->shouldLoop === true can never be false.
Loading history...
159
            $this->logger->debug('++Loop++');
160
            if ($readableContent instanceof Publish) {
161
                // Only if we receive a Publish event from the broker, yield the contents
162
                yield $readableContent->getMessage();
163
            } else {
164
                // Only wait for a certain amount of time if there was nothing in the queue
165
                $this->logger->debug('Got an incoming object, but not a message, disregarding', [
166
                    'class' => \get_class($readableContent),
167
                ]);
168
                usleep($idleMicroseconds);
169
            }
170
171
            $readableContent = $this->checkForEvent($client);
172
        }
173
    }
174
175
    /**
176
     * Call this function to break out of the loop cleanly
177
     *
178
     * There is no way to know on which topics we are still subscribed on. This function lets us exit the above loop
179
     * cleanly without the need to disconnect from the broker.
180
     *
181
     * @return Subscribe
182
     */
183
    public function breakLoop(): self
184
    {
185
        $this->shouldLoop = false;
186
        return $this;
187
    }
188
189
    /**
190
     * A subscription is based on filters, this function allows us to pass on filters
191
     *
192
     * @param Topic[] $topics
193
     * @return Subscribe
194
     */
195
    public function addTopics(Topic ...$topics): Subscribe
196
    {
197
        $this->topics = $topics;
198
        $this->logger->debug('Topics added', ['totalTopics', count($this->topics)]);
199
200
        return $this;
201
    }
202
203
    /**
204
     * @param ClientInterface $client
205
     * @return bool
206
     */
207
    protected function checkPingTime(ClientInterface $client): bool
208
    {
209
        if ($client->isItPingTime()) {
210
            $this->logger->info('Pinging is needed, sending PingReq');
211
            $client->sendData(new PingReq($this->logger));
212
        }
213
214
        return true;
215
    }
216
}
217