Passed
Push — master ( d71bca...578a67 )
by Camilo
02:06
created

Subscribe::initializeObject()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use DomainException;
8
use Exception;
9
use Generator;
10
use OutOfRangeException;
11
use SplQueue;
12
use unreal4u\MQTT\Application\EmptyReadableResponse;
13
use unreal4u\MQTT\DataTypes\Message;
14
use unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined;
15
use unreal4u\MQTT\Exceptions\MustContainTopic;
16
use unreal4u\MQTT\Exceptions\NotConnected;
17
use unreal4u\MQTT\Internals\ClientInterface;
18
use unreal4u\MQTT\Internals\EventManager;
19
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
20
use unreal4u\MQTT\Internals\ProtocolBase;
21
use unreal4u\MQTT\Internals\ReadableContentInterface;
22
use unreal4u\MQTT\Internals\TopicFilterFunctionality;
23
use unreal4u\MQTT\Internals\WritableContent;
24
use unreal4u\MQTT\Internals\WritableContentInterface;
25
26
use function chr;
27
use function is_callable;
28
use function ord;
29
use function strlen;
30
use function usleep;
31
32
/**
33
 * The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.
34
 *
35
 * Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client
36
 * in order to forward Application Messages that were published to Topics that match these Subscriptions. The SUBSCRIBE
37
 * Packet also specifies (for each Subscription) the maximum QoS with which the Server can send Application Messages to
38
 * the Client.
39
 */
40
final class Subscribe extends ProtocolBase implements WritableContentInterface
41
{
42
    use /** @noinspection TraitsPropertiesConflictsInspection */
43
        WritableContent;
44
    use PacketIdentifierFunctionality;
45
    use TopicFilterFunctionality;
46
47
    private const CONTROL_PACKET_VALUE = 8;
48
49
    /**
50
     * Indicates whether to continue the loop or break it at any point, cleanly without disconnecting from the broker
51
     * @var bool
52
     */
53
    private $shouldLoop = true;
54
55 9
    protected function initializeObject(): ProtocolBase
56
    {
57 9
        $this->topics = new SplQueue();
58 9
        return parent::initializeObject();
59
    }
60
61
    /**
62
     * @return string
63
     * @throws OutOfRangeException
64
     * @throws Exception
65
     */
66 2
    public function createVariableHeader(): string
67
    {
68
        // Subscribe must always send a 2 flag
69 2
        $this->specialFlags = 2;
70 2
        return $this->getPacketIdentifierBinaryRepresentation();
71
    }
72
73
    /**
74
     * @return string
75
     * @throws MustContainTopic
76
     * @throws OutOfRangeException
77
     */
78 1
    public function createPayload(): string
79
    {
80 1
        $output = '';
81 1
        foreach ($this->getTopics() as $topic) {
82
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
83 1
            $output .= $this->createUTF8String($topic->getTopicFilter()) . chr($topic->getTopicFilterQoSLevel());
84
        }
85
86 1
        return $output;
87
    }
88
89
    /**
90
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
91
     *
92
     * This can however not be in the same order, as we may be able to receive PUBLISH packets before getting a SUBACK
93
     * back
94
     *
95
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
96
     * @return bool
97
     */
98 1
    public function shouldExpectAnswer(): bool
99
    {
100 1
        return true;
101
    }
102
103
    /**
104
     * Performs a check on the socket connection and returns either the contents or an empty object
105
     *
106
     * @param ClientInterface $client
107
     * @return ReadableContentInterface
108
     * @throws DomainException
109
     * @throws NotConnected
110
     * @throws NoConnectionParametersDefined
111
     */
112 3
    public function checkForEvent(ClientInterface $client): ReadableContentInterface
113
    {
114 3
        $this->checkPingTime($client);
115 3
        $publishPacketControlField = $client->readBrokerData(1);
116 3
        $eventManager = new EventManager($this->logger);
117
118 3
        if ((ord($publishPacketControlField) & 255) > 0) {
119 2
            $this->logger->debug('Event received', [
120 2
                'ordValue' => ord($publishPacketControlField) & 255,
121 2
                'length' => strlen($publishPacketControlField)
122
            ]);
123 2
            return $eventManager->analyzeHeaders($publishPacketControlField, $client);
124
        }
125
126 1
        $this->logger->debug('No valid publish packet control field found, returning empty response');
127 1
        return new EmptyReadableResponse($this->logger);
128
    }
129
130
    /**
131
     * Loop and yields different type of results back whenever they are available
132
     *
133
     * Be aware that 1 second is 1000000 microseconds
134
     *
135
     * @param ClientInterface $client
136
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
137
     * @param callable|null $hookBeforeLoop
138
     * @return Generator|Message[]
139
     * @throws NotConnected
140
     * @throws NoConnectionParametersDefined
141
     * @throws DomainException
142
     */
143 1
    public function loop(
144
        ClientInterface $client,
145
        int $idleMicroseconds = 100000,
146
        callable $hookBeforeLoop = null
147
    ): Generator {
148 1
        $this->shouldLoop = true;
149
        // First of all: subscribe
150 1
        $this->logger->debug('Beginning loop', ['idleMicroseconds' => $idleMicroseconds]);
151 1
        $readableContent = $client->processObject($this);
152
153
        // Allow the user to do certain stuff before looping, for example: an Unsubscribe
154 1
        if (is_callable($hookBeforeLoop)) {
155 1
            $this->logger->notice('Callable detected, executing', ['userFunctionName' => $hookBeforeLoop]);
156 1
            $hookBeforeLoop($this->logger);
157
        }
158
159 1
        while ($this->shouldLoop === true) {
160 1
            $this->logger->debug('++Loop++');
161 1
            if ($readableContent instanceof Publish) {
162
                $readableContent->performSpecialActions($client, $this);
163
                // Only if we receive a Publish event from the broker, yield the contents
164
                yield $readableContent->getMessage();
165
            } else {
166
                // Only wait for a certain amount of time if there was nothing in the queue
167 1
                usleep($idleMicroseconds);
168
            }
169
170 1
            $readableContent = $this->checkForEvent($client);
171
        }
172
    }
173
174
    /**
175
     * Call this function to break out of the loop cleanly
176
     *
177
     * There is no way to know on which topics we are still subscribed on. This function lets us exit the above loop
178
     * cleanly without the need to disconnect from the broker.
179
     *
180
     * @return Subscribe
181
     */
182
    public function breakLoop(): self
183
    {
184
        $this->shouldLoop = false;
185
        return $this;
186
    }
187
188
    /**
189
     * @param ClientInterface $client
190
     * @return bool
191
     */
192 4
    protected function checkPingTime(ClientInterface $client): bool
193
    {
194 4
        if ($client->isItPingTime()) {
195 1
            $this->logger->info('Pinging is needed, sending PingReq');
196 1
            $client->processObject(new PingReq($this->logger));
197
        }
198
199 4
        return true;
200
    }
201
}
202