Passed
Push — master ( 33ffba...f55639 )
by Camilo
02:44
created

Subscribe::addTopics()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 4
cts 4
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\DataTypes\Topic;
9
use unreal4u\MQTT\Internals\ClientInterface;
10
use unreal4u\MQTT\Internals\EventManager;
11
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
12
use unreal4u\MQTT\Internals\ProtocolBase;
13
use unreal4u\MQTT\Internals\ReadableContentInterface;
14
use unreal4u\MQTT\Internals\WritableContent;
15
use unreal4u\MQTT\Internals\WritableContentInterface;
16
17
/**
18
 * The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.
19
 *
20
 * Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client
21
 * in order to forward Application Messages that were published to Topics that match these Subscriptions. The SUBSCRIBE
22
 * Packet also specifies (for each Subscription) the maximum QoS with which the Server can send Application Messages to
23
 * the Client.
24
 */
25
final class Subscribe extends ProtocolBase implements WritableContentInterface
26
{
27
    use WritableContent, PacketIdentifierFunctionality;
28
29
    const CONTROL_PACKET_VALUE = 8;
30
31
    /**
32
     * An array of topics on which to subscribe to
33
     * @var Topic[]
34
     */
35
    private $topics = [];
36
37
    /**
38
     * Indicates whether to continue the loop or break it at any point, cleanly without disconnecting from the broker
39
     * @var bool
40
     */
41
    private $shouldLoop = true;
42
43
    /**
44
     * @return string
45
     * @throws \OutOfRangeException
46
     * @throws \Exception
47
     */
48 2
    public function createVariableHeader(): string
49
    {
50
        // Subscribe must always send a 2 flag
51 2
        $this->specialFlags = 2;
52 2
        return $this->getPacketIdentifierBinaryRepresentation();
53
    }
54
55
    /**
56
     * @return string
57
     * @throws \OutOfRangeException
58
     */
59 1
    public function createPayload(): string
60
    {
61 1
        $output = '';
62 1
        foreach ($this->getTopics() as $topic) {
63
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
64 1
            $output .= $this->createUTF8String($topic->getTopicName()) . \chr($topic->getTopicQoSLevel());
65
        }
66 1
        return $output;
67
    }
68
69
    /**
70
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
71
     *
72
     * This can however not be in the same order, as we may be able to receive PUBLISH packets before getting a SUBACK
73
     * back
74
     *
75
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
76
     * @return bool
77
     */
78 1
    public function shouldExpectAnswer(): bool
79
    {
80 1
        return true;
81
    }
82
83
    /**
84
     * Performs a check on the socket connection and returns either the contents or an empty object
85
     *
86
     * @param ClientInterface $client
87
     * @return ReadableContentInterface
88
     * @throws \DomainException
89
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
90
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
91
     */
92 4
    public function checkForEvent(ClientInterface $client): ReadableContentInterface
93
    {
94 4
        $this->checkPingTime($client);
95 4
        $publishPacketControlField = $client->readBrokerData(1);
96 4
        $eventManager = new EventManager($this->logger);
97
98 4
        if ((\ord($publishPacketControlField) & 255) > 0) {
99 3
            $this->logger->debug('Event received', [
100 3
                'ordValue' => \ord($publishPacketControlField) & 255,
101 3
                'length' => \strlen($publishPacketControlField)
102
            ]);
103 3
            return $eventManager->analyzeHeaders($publishPacketControlField, $client);
104
        }
105
106 3
        $this->logger->debug('No valid publish packet control field found, returning empty response');
107 3
        return new EmptyReadableResponse($this->logger);
108
    }
109
110
    /**
111
     * Loop and yields different type of results back whenever they are available
112
     *
113
     * Be aware that 1 second is 1000000 microseconds
114
     *
115
     * @param ClientInterface $client
116
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
117
     * @param callable|null $hookBeforeLoop
118
     * @return \Generator
119
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
120
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
121
     * @throws \DomainException
122
     */
123 2
    public function loop(
124
        ClientInterface $client,
125
        int $idleMicroseconds = 100000,
126
        callable $hookBeforeLoop = null
127
    ): \Generator {
128 2
        $this->shouldLoop = true;
129
        // First of all: subscribe
130 2
        $this->logger->debug('Beginning loop', ['idleMicroseconds' => $idleMicroseconds]);
131 2
        $readableContent = $client->processObject($this);
132
133
        // Allow the user to do certain stuff before looping, for example: an Unsubscribe
134 2
        if (\is_callable($hookBeforeLoop)) {
135 1
            $this->logger->notice('Callable detected, executing', ['userFunctionName' => $hookBeforeLoop]);
136 1
            $hookBeforeLoop($this->logger);
137
        }
138
139 2
        while ($this->shouldLoop === true) {
140 2
            $this->logger->debug('++Loop++');
141 2
            if ($readableContent instanceof Publish) {
142 2
                $readableContent->performSpecialActions($client, $this);
143
                // Only if we receive a Publish event from the broker, yield the contents
144 2
                yield $readableContent->getMessage();
145
            } else {
146
                // Only wait for a certain amount of time if there was nothing in the queue
147 2
                usleep($idleMicroseconds);
148
            }
149
150 2
            $readableContent = $this->checkForEvent($client);
151
        }
152 2
    }
153
154
    /**
155
     * Call this function to break out of the loop cleanly
156
     *
157
     * There is no way to know on which topics we are still subscribed on. This function lets us exit the above loop
158
     * cleanly without the need to disconnect from the broker.
159
     *
160
     * @return Subscribe
161
     */
162 2
    public function breakLoop(): self
163
    {
164 2
        $this->shouldLoop = false;
165 2
        return $this;
166
    }
167
168
    /**
169
     * A subscription is based on filters, this function allows us to pass on filters
170
     *
171
     * @param Topic[] $topics
172
     * @return Subscribe
173
     */
174 4
    public function addTopics(Topic ...$topics): Subscribe
175
    {
176 4
        $this->topics = array_merge($this->topics, $topics);
177 4
        $this->logger->debug('Topics added', ['totalTopics', $this->getNumberOfTopics()]);
178
179 4
        return $this;
180
    }
181
182
    /**
183
     * Returns the current number of topics
184
     *
185
     * @return int
186
     */
187 4
    public function getNumberOfTopics(): int
188
    {
189 4
        return count($this->topics);
190
    }
191
192
    /**
193
     * Returns the topics in the order they were inserted / requested
194
     *
195
     * The order is important because SUBACK will return the status code for each topic in this order, without
196
     * explicitly identifying which topic is which
197
     *
198
     * @return \Generator|Topic[]
199
     */
200 1
    public function getTopics(): \Generator
201
    {
202 1
        foreach ($this->topics as $topic) {
203 1
            yield $topic;
204
        }
205 1
    }
206
207
    /**
208
     * @param ClientInterface $client
209
     * @return bool
210
     */
211 5
    protected function checkPingTime(ClientInterface $client): bool
212
    {
213 5
        if ($client->isItPingTime()) {
214 1
            $this->logger->info('Pinging is needed, sending PingReq');
215 1
            $client->processObject(new PingReq($this->logger));
216
        }
217
218 5
        return true;
219
    }
220
}
221