Passed
Push — master ( 0ad4bf...cebcdc )
by Camilo
02:13
created

Subscribe::createVariableHeader()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 5
ccs 3
cts 3
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\DataTypes\Topic;
9
use unreal4u\MQTT\Internals\ClientInterface;
10
use unreal4u\MQTT\Internals\EventManager;
11
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
12
use unreal4u\MQTT\Internals\ProtocolBase;
13
use unreal4u\MQTT\Internals\ReadableContentInterface;
14
use unreal4u\MQTT\Internals\WritableContent;
15
use unreal4u\MQTT\Internals\WritableContentInterface;
16
17
/**
18
 * The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.
19
 *
20
 * Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client
21
 * in order to forward Application Messages that were published to Topics that match these Subscriptions. The SUBSCRIBE
22
 * Packet also specifies (for each Subscription) the maximum QoS with which the Server can send Application Messages to
23
 * the Client.
24
 */
25
final class Subscribe extends ProtocolBase implements WritableContentInterface
26
{
27
    use WritableContent, PacketIdentifierFunctionality;
28
29
    const CONTROL_PACKET_VALUE = 8;
30
31
    /**
32
     * An array of topics on which to subscribe to
33
     * @var Topic[]
34
     */
35
    private $topics = [];
36
37
    /**
38
     * Indicates whether to continue the loop or break it at any point, cleanly without disconnecting from the broker
39
     * @var bool
40
     */
41
    private $shouldLoop = true;
42
43
    /**
44
     * @return string
45
     * @throws \OutOfRangeException
46
     * @throws \Exception
47
     */
48 2
    public function createVariableHeader(): string
49
    {
50
        // Subscribe must always send a 2 flag
51 2
        $this->specialFlags = 2;
52 2
        return $this->getPacketIdentifierBinaryRepresentation();
53
    }
54
55
    /**
56
     * @return string
57
     * @throws \OutOfRangeException
58
     */
59 1
    public function createPayload(): string
60
    {
61 1
        $output = '';
62 1
        foreach ($this->getTopics() as $topic) {
63
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
64 1
            $output .= $this->createUTF8String($topic->getTopicName()) . \chr($topic->getTopicQoSLevel());
65
        }
66 1
        return $output;
67
    }
68
69
    /**
70
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
71
     *
72
     * This can however not be in the same order, as we may be able to receive PUBLISH packets before getting a SUBACK
73
     * back
74
     *
75
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
76
     * @return bool
77
     */
78 1
    public function shouldExpectAnswer(): bool
79
    {
80 1
        return true;
81
    }
82
83
    /**
84
     * Performs a check on the socket connection and returns either the contents or an empty object
85
     *
86
     * @param ClientInterface $client
87
     * @return ReadableContentInterface
88
     * @throws \DomainException
89
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
90
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
91
     */
92 2
    public function checkForEvent(ClientInterface $client): ReadableContentInterface
93
    {
94 2
        $this->checkPingTime($client);
95 2
        $publishPacketControlField = $client->readBrokerData(1);
96 2
        $eventManager = new EventManager($this->logger);
97
98 2
        if ((\ord($publishPacketControlField) & 255) > 0) {
99 1
            $this->logger->debug('Event received', [
100 1
                'ordValue' => \ord($publishPacketControlField) & 255,
101 1
                'length' => \strlen($publishPacketControlField)
102
            ]);
103 1
            return $eventManager->analyzeHeaders($publishPacketControlField, $client);
104
        }
105
106 1
        $this->logger->debug('No valid publish packet control field found, returning empty response');
107 1
        return new EmptyReadableResponse($this->logger);
108
    }
109
110
    /**
111
     * Loop and yields different type of results back whenever they are available
112
     *
113
     * @param ClientInterface $client
114
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
115
     * @param callable|null $hookBeforeLoop
116
     * @return \Generator
117
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
118
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
119
     * @throws \DomainException
120
     */
121
    public function loop(
122
        ClientInterface $client,
123
        int $idleMicroseconds = 100000,
124
        callable $hookBeforeLoop = null
125
    ): \Generator {
126
        $this->shouldLoop = true;
127
        // First of all: subscribe
128
        $this->logger->debug('Beginning loop', ['idleMicroseconds' => $idleMicroseconds]);
129
        $readableContent = $client->processObject($this);
130
131
        // Allow the user to do certain stuff before looping, for example: an Unsubscribe
132
        if (\is_callable($hookBeforeLoop)) {
133
            $this->logger->notice('Callable detected, executing', ['userFunctionName' => $hookBeforeLoop]);
134
            $hookBeforeLoop($this->logger);
135
        }
136
137
        while ($this->shouldLoop === true) {
138
            $this->logger->debug('++Loop++');
139
            if ($readableContent instanceof Publish) {
140
                $readableContent->performSpecialActions($client, $this);
141
                // Only if we receive a Publish event from the broker, yield the contents
142
                yield $readableContent->getMessage();
143
            } else {
144
                // Only wait for a certain amount of time if there was nothing in the queue
145
                #$this->logger->debug('Disregarding', [
146
                #    'class' => \get_class($readableContent),
147
                #]);
148
                usleep($idleMicroseconds);
149
            }
150
151
            $readableContent = $this->checkForEvent($client);
152
        }
153
    }
154
155
    /**
156
     * Call this function to break out of the loop cleanly
157
     *
158
     * There is no way to know on which topics we are still subscribed on. This function lets us exit the above loop
159
     * cleanly without the need to disconnect from the broker.
160
     *
161
     * @return Subscribe
162
     */
163
    public function breakLoop(): self
164
    {
165
        $this->shouldLoop = false;
166
        return $this;
167
    }
168
169
    /**
170
     * A subscription is based on filters, this function allows us to pass on filters
171
     *
172
     * @param Topic[] $topics
173
     * @return Subscribe
174
     */
175 4
    public function addTopics(Topic ...$topics): Subscribe
176
    {
177 4
        $this->topics = array_merge($this->topics, $topics);
178 4
        $this->logger->debug('Topics added', ['totalTopics', $this->getNumberOfTopics()]);
179
180 4
        return $this;
181
    }
182
183
    /**
184
     * Returns the current number of topics
185
     *
186
     * @return int
187
     */
188 4
    public function getNumberOfTopics(): int
189
    {
190 4
        return count($this->topics);
191
    }
192
193
    /**
194
     * Returns the topics in the order they were inserted / requested
195
     *
196
     * The order is important because SUBACK will return the status code for each topic in this order, without
197
     * explicitly identifying which topic is which
198
     *
199
     * @return \Generator|Topic[]
200
     */
201 1
    public function getTopics(): \Generator
202
    {
203 1
        foreach ($this->topics as $topic) {
204 1
            yield $topic;
205
        }
206 1
    }
207
208
    /**
209
     * @param ClientInterface $client
210
     * @return bool
211
     */
212 2
    protected function checkPingTime(ClientInterface $client): bool
213
    {
214 2
        if ($client->isItPingTime()) {
215
            $this->logger->info('Pinging is needed, sending PingReq');
216
            $client->processObject(new PingReq($this->logger));
217
        }
218
219 2
        return true;
220
    }
221
}
222