Completed
Push — master ( 4ca550...f445a9 )
by Camilo
02:08
created

Subscribe::setPacketIdentifier()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 0
cts 6
cp 0
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 2
nop 1
crap 12
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\DataTypes\Topic;
9
use unreal4u\MQTT\DataTypes\PacketIdentifier;
10
use unreal4u\MQTT\Internals\ClientInterface;
11
use unreal4u\MQTT\Internals\EventManager;
12
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
13
use unreal4u\MQTT\Internals\ProtocolBase;
14
use unreal4u\MQTT\Internals\ReadableContentInterface;
15
use unreal4u\MQTT\Internals\WritableContent;
16
use unreal4u\MQTT\Internals\WritableContentInterface;
17
18
/**
19
 * The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.
20
 *
21
 * Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client
22
 * in order to forward Application Messages that were published to Topics that match these Subscriptions. The SUBSCRIBE
23
 * Packet also specifies (for each Subscription) the maximum QoS with which the Server can send Application Messages to
24
 * the Client.
25
 */
26
final class Subscribe extends ProtocolBase implements WritableContentInterface
27
{
28
    use WritableContent, PacketIdentifierFunctionality;
29
30
    const CONTROL_PACKET_VALUE = 8;
31
32
    /**
33
     * An array of topics on which to subscribe to
34
     * @var Topic[]
35
     */
36
    private $topics = [];
37
38
    /**
39
     * Indicates whether to continue the loop or break it at any point, cleanly without disconnecting from the broker
40
     * @var bool
41
     */
42
    private $shouldLoop = true;
43
44
    /**
45
     * @return string
46
     * @throws \OutOfRangeException
47
     * @throws \Exception
48
     */
49
    public function createVariableHeader(): string
50
    {
51
        // Subscribe must always send a 2 flag
52
        $this->specialFlags = 2;
53
54
        // Assign a packet identifier automatically if none has been assigned yet
55
        if ($this->packetIdentifier === 0) {
0 ignored issues
show
introduced by
The condition $this->packetIdentifier === 0 can never be true.
Loading history...
56
            $this->setPacketIdentifier(new PacketIdentifier(random_int(1, 65535)));
57
        }
58
59
        return $this->getPacketIdentifierBinaryRepresentation();
60
    }
61
62
    /**
63
     * @return string
64
     * @throws \OutOfRangeException
65
     */
66
    public function createPayload(): string
67
    {
68
        $output = '';
69
        foreach ($this->getTopics() as $topic) {
70
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
71
            $output .= $this->createUTF8String($topic->getTopicName()) . \chr($topic->getTopicQoSLevel());
72
        }
73
        return $output;
74
    }
75
76
    /**
77
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
78
     *
79
     * This can however not be in the same order, as we may be able to receive PUBLISH packets before getting a SUBACK
80
     * back
81
     *
82
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
83
     * @return bool
84
     */
85
    public function shouldExpectAnswer(): bool
86
    {
87
        return true;
88
    }
89
90
    /**
91
     * Performs a check on the socket connection and returns either the contents or an empty object
92
     *
93
     * @param ClientInterface $client
94
     * @return ReadableContentInterface
95
     * @throws \DomainException
96
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
97
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
98
     */
99
    public function checkForEvent(ClientInterface $client): ReadableContentInterface
100
    {
101
        $this->checkPingTime($client);
102
        $publishPacketControlField = $client->readBrokerData(1);
103
        $eventManager = new EventManager($this->logger);
104
105
        if ((\ord($publishPacketControlField) & 255) > 0) {
106
            $this->logger->debug('Event received', [
107
                'ordValue' => \ord($publishPacketControlField) & 255,
108
                'length' => \strlen($publishPacketControlField)
109
            ]);
110
            return $eventManager->analyzeHeaders($publishPacketControlField, $client);
111
        }
112
113
        $this->logger->debug('No valid publish packet control field found, returning empty response');
114
        return new EmptyReadableResponse($this->logger);
115
    }
116
117
    /**
118
     * Loop and yields different type of results back whenever they are available
119
     *
120
     * @param ClientInterface $client
121
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
122
     * @param callable|null $hookBeforeLoop
123
     * @return \Generator
124
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
125
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
126
     * @throws \DomainException
127
     */
128
    public function loop(
129
        ClientInterface $client,
130
        int $idleMicroseconds = 100000,
131
        callable $hookBeforeLoop = null
132
    ): \Generator {
133
        $this->shouldLoop = true;
134
        // First of all: subscribe
135
        $this->logger->debug('Beginning loop', ['idleMicroseconds' => $idleMicroseconds]);
136
        $readableContent = $client->processObject($this);
137
138
        // Allow the user to do certain stuff before looping, for example: an Unsubscribe
139
        if (\is_callable($hookBeforeLoop)) {
140
            $this->logger->notice('Callable detected, executing', ['userFunctionName' => $hookBeforeLoop]);
141
            $hookBeforeLoop($this->logger);
142
        }
143
144
        while ($this->shouldLoop === true) {
145
            $this->logger->debug('++Loop++');
146
            if ($readableContent instanceof Publish) {
147
                $readableContent->performSpecialActions($client, $this);
148
                // Only if we receive a Publish event from the broker, yield the contents
149
                yield $readableContent->getMessage();
150
            } else {
151
                // Only wait for a certain amount of time if there was nothing in the queue
152
                #$this->logger->debug('Disregarding', [
153
                #    'class' => \get_class($readableContent),
154
                #]);
155
                usleep($idleMicroseconds);
156
            }
157
158
            $readableContent = $this->checkForEvent($client);
159
        }
160
    }
161
162
    /**
163
     * Call this function to break out of the loop cleanly
164
     *
165
     * There is no way to know on which topics we are still subscribed on. This function lets us exit the above loop
166
     * cleanly without the need to disconnect from the broker.
167
     *
168
     * @return Subscribe
169
     */
170
    public function breakLoop(): self
171
    {
172
        $this->shouldLoop = false;
173
        return $this;
174
    }
175
176
    /**
177
     * A subscription is based on filters, this function allows us to pass on filters
178
     *
179
     * @param Topic[] $topics
180
     * @return Subscribe
181
     */
182
    public function addTopics(Topic ...$topics): Subscribe
183
    {
184
        $this->topics = $topics;
185
        $this->logger->debug('Topics added', ['totalTopics', $this->getNumberOfTopics()]);
186
187
        return $this;
188
    }
189
190
    /**
191
     * Returns the current number of topics
192
     *
193
     * @return int
194
     */
195
    public function getNumberOfTopics(): int
196
    {
197
        return count($this->topics);
198
    }
199
200
    /**
201
     * Returns the topics in the order they were inserted / requested
202
     *
203
     * The order is important because SUBACK will return the status code for each topic in this order, without
204
     * explicitly identifying which topic is which
205
     *
206
     * @return \Generator|Topic[]
207
     */
208
    public function getTopics(): \Generator
209
    {
210
        foreach ($this->topics as $topic) {
211
            yield $topic;
212
        }
213
    }
214
215
    /**
216
     * @param ClientInterface $client
217
     * @return bool
218
     */
219
    protected function checkPingTime(ClientInterface $client): bool
220
    {
221
        if ($client->isItPingTime()) {
222
            $this->logger->info('Pinging is needed, sending PingReq');
223
            $client->processObject(new PingReq($this->logger));
224
        }
225
226
        return true;
227
    }
228
}
229