Completed
Push — master ( d939b7...332c9a )
by Camilo
02:32
created

Subscribe::checkPingTime()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 4
nc 2
nop 1
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\DataTypes\Topic;
9
use unreal4u\MQTT\Internals\ClientInterface;
10
use unreal4u\MQTT\Internals\EventManager;
11
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
12
use unreal4u\MQTT\Internals\ProtocolBase;
13
use unreal4u\MQTT\Internals\ReadableContentInterface;
14
use unreal4u\MQTT\Internals\WritableContent;
15
use unreal4u\MQTT\Internals\WritableContentInterface;
16
17
/**
18
 * The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.
19
 *
20
 * Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client
21
 * in order to forward Application Messages that were published to Topics that match these Subscriptions. The SUBSCRIBE
22
 * Packet also specifies (for each Subscription) the maximum QoS with which the Server can send Application Messages to
23
 * the Client.
24
 */
25
final class Subscribe extends ProtocolBase implements WritableContentInterface
26
{
27
    use WritableContent, PacketIdentifierFunctionality;
28
29
    const CONTROL_PACKET_VALUE = 8;
30
31
    /**
32
     * An array of topics on which to subscribe to
33
     * @var Topic[]
34
     */
35
    private $topics = [];
36
37
    /**
38
     * Indicates whether to continue the loop or break it at any point, cleanly without disconnecting from the broker
39
     * @var bool
40
     */
41
    private $shouldLoop = true;
42
43
    /**
44
     * @return string
45
     * @throws \OutOfRangeException
46
     * @throws \Exception
47
     */
48 2
    public function createVariableHeader(): string
49
    {
50
        // Subscribe must always send a 2 flag
51 2
        $this->specialFlags = 2;
52 2
        return $this->getPacketIdentifierBinaryRepresentation();
53
    }
54
55
    /**
56
     * @return string
57
     * @throws \OutOfRangeException
58
     */
59 1
    public function createPayload(): string
60
    {
61 1
        $output = '';
62 1
        foreach ($this->getTopics() as $topic) {
63
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
64 1
            $output .= $this->createUTF8String($topic->getTopicName()) . \chr($topic->getTopicQoSLevel());
65
        }
66 1
        return $output;
67
    }
68
69
    /**
70
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
71
     *
72
     * This can however not be in the same order, as we may be able to receive PUBLISH packets before getting a SUBACK
73
     * back
74
     *
75
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
76
     * @return bool
77
     */
78 1
    public function shouldExpectAnswer(): bool
79
    {
80 1
        return true;
81
    }
82
83
    /**
84
     * Performs a check on the socket connection and returns either the contents or an empty object
85
     *
86
     * @param ClientInterface $client
87
     * @return ReadableContentInterface
88
     * @throws \DomainException
89
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
90
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
91
     */
92 2
    public function checkForEvent(ClientInterface $client): ReadableContentInterface
93
    {
94 2
        $this->checkPingTime($client);
95 2
        $publishPacketControlField = $client->readBrokerData(1);
96 2
        $eventManager = new EventManager($this->logger);
97
98 2
        if ((\ord($publishPacketControlField) & 255) > 0) {
99 1
            $this->logger->debug('Event received', [
100 1
                'ordValue' => \ord($publishPacketControlField) & 255,
101 1
                'length' => \strlen($publishPacketControlField)
102
            ]);
103 1
            return $eventManager->analyzeHeaders($publishPacketControlField, $client);
104
        }
105
106 1
        $this->logger->debug('No valid publish packet control field found, returning empty response');
107 1
        return new EmptyReadableResponse($this->logger);
108
    }
109
110
    /**
111
     * Loop and yields different type of results back whenever they are available
112
     *
113
     * @param ClientInterface $client
114
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
115
     * @param callable|null $hookBeforeLoop
116
     * @return \Generator
117
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
118
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
119
     * @throws \DomainException
120
     */
121
    public function loop(
122
        ClientInterface $client,
123
        int $idleMicroseconds = 100000,
124
        callable $hookBeforeLoop = null
125
    ): \Generator {
126
        $this->shouldLoop = true;
127
        // First of all: subscribe
128
        $this->logger->debug('Beginning loop', ['idleMicroseconds' => $idleMicroseconds]);
129
        $readableContent = $client->processObject($this);
130
131
        // Allow the user to do certain stuff before looping, for example: an Unsubscribe
132
        if (\is_callable($hookBeforeLoop)) {
133
            $this->logger->notice('Callable detected, executing', ['userFunctionName' => $hookBeforeLoop]);
134
            $hookBeforeLoop($this->logger);
135
        }
136
137
        while ($this->shouldLoop === true) {
138
            $this->logger->debug('++Loop++');
139
            if ($readableContent instanceof Publish) {
140
                $readableContent->performSpecialActions($client, $this);
141
                // Only if we receive a Publish event from the broker, yield the contents
142
                yield $readableContent->getMessage();
143
            } else {
144
                // Only wait for a certain amount of time if there was nothing in the queue
145
                usleep($idleMicroseconds);
146
            }
147
148
            $readableContent = $this->checkForEvent($client);
149
        }
150
    }
151
152
    /**
153
     * Call this function to break out of the loop cleanly
154
     *
155
     * There is no way to know on which topics we are still subscribed on. This function lets us exit the above loop
156
     * cleanly without the need to disconnect from the broker.
157
     *
158
     * @return Subscribe
159
     */
160
    public function breakLoop(): self
161
    {
162
        $this->shouldLoop = false;
163
        return $this;
164
    }
165
166
    /**
167
     * A subscription is based on filters, this function allows us to pass on filters
168
     *
169
     * @param Topic[] $topics
170
     * @return Subscribe
171
     */
172 4
    public function addTopics(Topic ...$topics): Subscribe
173
    {
174 4
        $this->topics = array_merge($this->topics, $topics);
175 4
        $this->logger->debug('Topics added', ['totalTopics', $this->getNumberOfTopics()]);
176
177 4
        return $this;
178
    }
179
180
    /**
181
     * Returns the current number of topics
182
     *
183
     * @return int
184
     */
185 4
    public function getNumberOfTopics(): int
186
    {
187 4
        return count($this->topics);
188
    }
189
190
    /**
191
     * Returns the topics in the order they were inserted / requested
192
     *
193
     * The order is important because SUBACK will return the status code for each topic in this order, without
194
     * explicitly identifying which topic is which
195
     *
196
     * @return \Generator|Topic[]
197
     */
198 1
    public function getTopics(): \Generator
199
    {
200 1
        foreach ($this->topics as $topic) {
201 1
            yield $topic;
202
        }
203 1
    }
204
205
    /**
206
     * @param ClientInterface $client
207
     * @return bool
208
     */
209 3
    protected function checkPingTime(ClientInterface $client): bool
210
    {
211 3
        if ($client->isItPingTime()) {
212 1
            $this->logger->info('Pinging is needed, sending PingReq');
213 1
            $client->processObject(new PingReq($this->logger));
214
        }
215
216 3
        return true;
217
    }
218
}
219