Subscribe   A
last analyzed

Complexity

Total Complexity 14

Size/Duplication

Total Lines 160
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 10
Bugs 1 Features 0
Metric Value
wmc 14
eloc 45
c 10
b 1
f 0
dl 0
loc 160
ccs 50
cts 50
cp 1
rs 10

8 Methods

Rating   Name   Duplication   Size   Complexity  
A createPayload() 0 9 2
A checkPingTime() 0 8 2
A initializeObject() 0 4 1
A checkForEvent() 0 16 2
A createVariableHeader() 0 5 1
A shouldExpectAnswer() 0 3 1
A breakLoop() 0 4 1
A loop() 0 28 4
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use DomainException;
8
use Exception;
9
use Generator;
10
use OutOfRangeException;
11
use SplQueue;
12
use unreal4u\MQTT\Application\EmptyReadableResponse;
13
use unreal4u\MQTT\DataTypes\Message;
14
use unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined;
15
use unreal4u\MQTT\Exceptions\MustContainTopic;
16
use unreal4u\MQTT\Exceptions\NotConnected;
17
use unreal4u\MQTT\Internals\ClientInterface;
18
use unreal4u\MQTT\Internals\EventManager;
19
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
20
use unreal4u\MQTT\Internals\ProtocolBase;
21
use unreal4u\MQTT\Internals\ReadableContentInterface;
22
use unreal4u\MQTT\Internals\TopicFilterFunctionality;
23
use unreal4u\MQTT\Internals\WritableContent;
24
use unreal4u\MQTT\Internals\WritableContentInterface;
25
26
use function chr;
27
use function is_callable;
28
use function ord;
29
use function strlen;
30
use function usleep;
31
32
/**
33
 * The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.
34
 *
35
 * Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client
36
 * in order to forward Application Messages that were published to Topics that match these Subscriptions. The SUBSCRIBE
37
 * Packet also specifies (for each Subscription) the maximum QoS with which the Server can send Application Messages to
38
 * the Client.
39
 */
40
final class Subscribe extends ProtocolBase implements WritableContentInterface
41
{
42
    use /** @noinspection TraitsPropertiesConflictsInspection */
43 1
        WritableContent;
44 1
    use PacketIdentifierFunctionality;
45 1
    use TopicFilterFunctionality;
46
47
    private const CONTROL_PACKET_VALUE = 8;
48
49
    /**
50
     * Indicates whether to continue the loop or break it at any point, cleanly without disconnecting from the broker
51
     * @var bool
52
     */
53
    private $shouldLoop = true;
54
55 10
    protected function initializeObject(): ProtocolBase
56
    {
57 10
        $this->topics = new SplQueue();
58 10
        return parent::initializeObject();
59
    }
60
61
    /**
62
     * @return string
63
     * @throws OutOfRangeException
64
     * @throws Exception
65
     */
66 2
    public function createVariableHeader(): string
67
    {
68
        // Subscribe must always send a 2 flag
69 2
        $this->specialFlags = 2;
70 2
        return $this->getPacketIdentifierBinaryRepresentation();
71
    }
72
73
    /**
74
     * @return string
75
     * @throws MustContainTopic
76
     * @throws OutOfRangeException
77
     */
78 1
    public function createPayload(): string
79
    {
80 1
        $output = '';
81 1
        foreach ($this->getTopics() as $topic) {
82
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
83 1
            $output .= $this->createUTF8String($topic->getTopicFilter()) . chr($topic->getTopicFilterQoSLevel());
84
        }
85
86 1
        return $output;
87
    }
88
89
    /**
90
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
91
     *
92
     * This can however not be in the same order, as we may be able to receive PUBLISH packets before getting a SUBACK
93
     * back
94
     *
95
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
96
     * @return bool
97
     */
98 1
    public function shouldExpectAnswer(): bool
99
    {
100 1
        return true;
101
    }
102
103
    /**
104
     * Performs a check on the socket connection and returns either the contents or an empty object
105
     *
106
     * @param ClientInterface $client
107
     * @return ReadableContentInterface
108
     * @throws DomainException
109
     * @throws NotConnected
110
     * @throws NoConnectionParametersDefined
111
     */
112 4
    public function checkForEvent(ClientInterface $client): ReadableContentInterface
113
    {
114 4
        $this->checkPingTime($client);
115 4
        $publishPacketControlField = $client->readBrokerData(1);
116 4
        $eventManager = new EventManager($this->logger);
117
118 4
        if ((ord($publishPacketControlField) & 255) > 0) {
119 3
            $this->logger->debug('Event received', [
120 3
                'ordValue' => ord($publishPacketControlField) & 255,
121 3
                'length' => strlen($publishPacketControlField)
122
            ]);
123 3
            return $eventManager->analyzeHeaders($publishPacketControlField, $client);
124
        }
125
126 3
        $this->logger->debug('No valid publish packet control field found, returning empty response');
127 3
        return new EmptyReadableResponse($this->logger);
128
    }
129
130
    /**
131
     * Loop and yields different type of results back whenever they are available
132
     *
133
     * Be aware that 1 second is 1000000 microseconds
134
     *
135
     * @param ClientInterface $client
136
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
137
     * @param callable|null $hookBeforeLoop
138
     * @return Generator|Message[]
139
     * @throws NotConnected
140
     * @throws NoConnectionParametersDefined
141
     * @throws DomainException
142
     */
143 2
    public function loop(
144
        ClientInterface $client,
145
        int $idleMicroseconds = 100000,
146
        callable $hookBeforeLoop = null
147
    ): Generator {
148 2
        $this->shouldLoop = true;
149
        // First of all: subscribe
150 2
        $this->logger->debug('Beginning loop', ['idleMicroseconds' => $idleMicroseconds]);
151 2
        $readableContent = $client->processObject($this);
152
153
        // Allow the user to do certain stuff before looping, for example: an Unsubscribe
154 2
        if (is_callable($hookBeforeLoop)) {
155 1
            $this->logger->notice('Callable detected, executing', ['userFunctionName' => $hookBeforeLoop]);
156 1
            $hookBeforeLoop($this->logger);
157
        }
158
159 2
        while ($this->shouldLoop === true) {
160 2
            $this->logger->debug('++Loop++');
161 2
            if ($readableContent instanceof Publish) {
162 2
                $readableContent->performSpecialActions($client, $this);
163
                // Only if we receive a Publish event from the broker, yield the contents
164 2
                yield $readableContent->getMessage();
165
            } else {
166
                // Only wait for a certain amount of time if there was nothing in the queue
167 2
                usleep($idleMicroseconds);
168
            }
169
170 2
            $readableContent = $this->checkForEvent($client);
171
        }
172 2
    }
173
174
    /**
175
     * Call this function to break out of the loop cleanly
176
     *
177
     * There is no way to know on which topics we are still subscribed on. This function lets us exit the above loop
178
     * cleanly without the need to disconnect from the broker.
179
     *
180
     * @return Subscribe
181
     */
182 2
    public function breakLoop(): self
183
    {
184 2
        $this->shouldLoop = false;
185 2
        return $this;
186
    }
187
188
    /**
189
     * @param ClientInterface $client
190
     * @return bool
191
     */
192 5
    protected function checkPingTime(ClientInterface $client): bool
193
    {
194 5
        if ($client->isItPingTime()) {
195 1
            $this->logger->info('Pinging is needed, sending PingReq');
196 1
            $client->processObject(new PingReq($this->logger));
197
        }
198
199 5
        return true;
200
    }
201
}
202