Passed
Push — master ( d36af8...d37c8b )
by Camilo
02:15
created

Subscribe::createPayload()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 5
cts 5
cp 1
rs 9.6666
c 0
b 0
f 0
cc 2
eloc 4
nc 2
nop 0
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\Internals\ClientInterface;
9
use unreal4u\MQTT\Internals\EventManager;
10
use unreal4u\MQTT\Internals\PacketIdentifierFunctionality;
11
use unreal4u\MQTT\Internals\ProtocolBase;
12
use unreal4u\MQTT\Internals\ReadableContentInterface;
13
use unreal4u\MQTT\Internals\TopicFunctionality;
14
use unreal4u\MQTT\Internals\WritableContent;
15
use unreal4u\MQTT\Internals\WritableContentInterface;
16
17
/**
18
 * The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.
19
 *
20
 * Each Subscription registers a Client’s interest in one or more Topics. The Server sends PUBLISH Packets to the Client
21
 * in order to forward Application Messages that were published to Topics that match these Subscriptions. The SUBSCRIBE
22
 * Packet also specifies (for each Subscription) the maximum QoS with which the Server can send Application Messages to
23
 * the Client.
24
 */
25
final class Subscribe extends ProtocolBase implements WritableContentInterface
26
{
27
    use WritableContent, PacketIdentifierFunctionality, TopicFunctionality;
28
29
    const CONTROL_PACKET_VALUE = 8;
30
31
    /**
32
     * Indicates whether to continue the loop or break it at any point, cleanly without disconnecting from the broker
33
     * @var bool
34
     */
35
    private $shouldLoop = true;
36
37 10
    protected function initializeObject(): ProtocolBase
38
    {
39 10
        $this->topics = new \SplQueue();
40 10
        return parent::initializeObject();
41
    }
42
43
    /**
44
     * @return string
45
     * @throws \OutOfRangeException
46
     * @throws \Exception
47
     */
48 2
    public function createVariableHeader(): string
49
    {
50
        // Subscribe must always send a 2 flag
51 2
        $this->specialFlags = 2;
52 2
        return $this->getPacketIdentifierBinaryRepresentation();
53
    }
54
55
    /**
56
     * @return string
57
     * @throws \unreal4u\MQTT\Exceptions\MustContainTopic
58
     * @throws \OutOfRangeException
59
     */
60 1
    public function createPayload(): string
61
    {
62 1
        $output = '';
63 1
        foreach ($this->getTopics() as $topic) {
64
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
65 1
            $output .= $this->createUTF8String($topic->getTopicName()) . \chr($topic->getTopicQoSLevel());
66
        }
67
68 1
        return $output;
69
    }
70
71
    /**
72
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
73
     *
74
     * This can however not be in the same order, as we may be able to receive PUBLISH packets before getting a SUBACK
75
     * back
76
     *
77
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
78
     * @return bool
79
     */
80 1
    public function shouldExpectAnswer(): bool
81
    {
82 1
        return true;
83
    }
84
85
    /**
86
     * Performs a check on the socket connection and returns either the contents or an empty object
87
     *
88
     * @param ClientInterface $client
89
     * @return ReadableContentInterface
90
     * @throws \DomainException
91
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
92
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
93
     */
94 4
    public function checkForEvent(ClientInterface $client): ReadableContentInterface
95
    {
96 4
        $this->checkPingTime($client);
97 4
        $publishPacketControlField = $client->readBrokerData(1);
98 4
        $eventManager = new EventManager($this->logger);
99
100 4
        if ((\ord($publishPacketControlField) & 255) > 0) {
101 3
            $this->logger->debug('Event received', [
102 3
                'ordValue' => \ord($publishPacketControlField) & 255,
103 3
                'length' => \strlen($publishPacketControlField)
104
            ]);
105 3
            return $eventManager->analyzeHeaders($publishPacketControlField, $client);
106
        }
107
108 3
        $this->logger->debug('No valid publish packet control field found, returning empty response');
109 3
        return new EmptyReadableResponse($this->logger);
110
    }
111
112
    /**
113
     * Loop and yields different type of results back whenever they are available
114
     *
115
     * Be aware that 1 second is 1000000 microseconds
116
     *
117
     * @param ClientInterface $client
118
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
119
     * @param callable|null $hookBeforeLoop
120
     * @return \Generator
121
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
122
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
123
     * @throws \DomainException
124
     */
125 2
    public function loop(
126
        ClientInterface $client,
127
        int $idleMicroseconds = 100000,
128
        callable $hookBeforeLoop = null
129
    ): \Generator {
130 2
        $this->shouldLoop = true;
131
        // First of all: subscribe
132 2
        $this->logger->debug('Beginning loop', ['idleMicroseconds' => $idleMicroseconds]);
133 2
        $readableContent = $client->processObject($this);
134
135
        // Allow the user to do certain stuff before looping, for example: an Unsubscribe
136 2
        if (\is_callable($hookBeforeLoop)) {
137 1
            $this->logger->notice('Callable detected, executing', ['userFunctionName' => $hookBeforeLoop]);
138 1
            $hookBeforeLoop($this->logger);
139
        }
140
141 2
        while ($this->shouldLoop === true) {
142 2
            $this->logger->debug('++Loop++');
143 2
            if ($readableContent instanceof Publish) {
144 2
                $readableContent->performSpecialActions($client, $this);
145
                // Only if we receive a Publish event from the broker, yield the contents
146 2
                yield $readableContent->getMessage();
147
            } else {
148
                // Only wait for a certain amount of time if there was nothing in the queue
149 2
                usleep($idleMicroseconds);
150
            }
151
152 2
            $readableContent = $this->checkForEvent($client);
153
        }
154 2
    }
155
156
    /**
157
     * Call this function to break out of the loop cleanly
158
     *
159
     * There is no way to know on which topics we are still subscribed on. This function lets us exit the above loop
160
     * cleanly without the need to disconnect from the broker.
161
     *
162
     * @return Subscribe
163
     */
164 2
    public function breakLoop(): self
165
    {
166 2
        $this->shouldLoop = false;
167 2
        return $this;
168
    }
169
170
    /**
171
     * @param ClientInterface $client
172
     * @return bool
173
     */
174 5
    protected function checkPingTime(ClientInterface $client): bool
175
    {
176 5
        if ($client->isItPingTime()) {
177 1
            $this->logger->info('Pinging is needed, sending PingReq');
178 1
            $client->processObject(new PingReq($this->logger));
179
        }
180
181 5
        return true;
182
    }
183
}
184