Passed
Push — master ( bc3840...800ced )
by Camilo
01:52
created

Subscribe::loop()   B

Complexity

Conditions 4
Paths 6

Size

Total Lines 23
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
dl 0
loc 23
ccs 0
cts 10
cp 0
rs 8.7972
c 0
b 0
f 0
cc 4
eloc 10
nc 6
nop 2
crap 20
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\Internals\ClientInterface;
9
use unreal4u\MQTT\Internals\EventManager;
10
use unreal4u\MQTT\Internals\ProtocolBase;
11
use unreal4u\MQTT\Internals\ReadableContentInterface;
12
use unreal4u\MQTT\Internals\WritableContent;
13
use unreal4u\MQTT\Internals\WritableContentInterface;
14
use unreal4u\MQTT\Protocol\Subscribe\Topic;
15
use unreal4u\MQTT\Utilities;
16
17
final class Subscribe extends ProtocolBase implements WritableContentInterface
18
{
19
    use WritableContent;
20
21
    const CONTROL_PACKET_VALUE = 8;
22
23
    private $packetIdentifier = 0;
24
25
    /**
26
     * An array of topics on which to subscribe to
27
     * @var Topic[]
28
     */
29
    private $topics = [];
30
31
    /**
32
     * @return string
33
     * @throws \OutOfRangeException
34
     * @throws \Exception
35
     */
36
    public function createVariableHeader(): string
37
    {
38
        // Subscribe must always send a 2 flag
39
        $this->specialFlags = 2;
40
41
        // Assign a packet identifier automatically if none has been assigned yet
42
        if ($this->packetIdentifier === 0) {
43
            $this->setPacketIdentifier(random_int(0, 65535));
44
        }
45
46
        return Utilities::convertNumberToBinaryString($this->packetIdentifier);
47
    }
48
49
    public function createPayload(): string
50
    {
51
        $output = '';
52
        foreach ($this->topics as $topic) {
53
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
54
            $output .= $this->createUTF8String($topic->getTopicName()) . \chr($topic->getTopicQoSLevel());
55
        }
56
        return $output;
57
    }
58
59
    /**
60
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
61
     *
62
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
63
     * @return bool
64
     */
65
    public function shouldExpectAnswer(): bool
66
    {
67
        return true;
68
    }
69
70
    /**
71
     * SUBSCRIBE Control Packets MUST contain a non-zero 16-bit Packet Identifier
72
     *
73
     * @param int $packetIdentifier
74
     * @return Subscribe
75
     * @throws \OutOfRangeException
76
     */
77
    public function setPacketIdentifier(int $packetIdentifier): Subscribe
78
    {
79
        if ($packetIdentifier > 65535 || $packetIdentifier < 1) {
80
            throw new \OutOfRangeException('Packet identifier must fit within 2 bytes');
81
        }
82
83
        $this->packetIdentifier = $packetIdentifier;
84
85
        return $this;
86
    }
87
88
    public function getPacketIdentifier(): int
89
    {
90
        return $this->packetIdentifier;
91
    }
92
93
    /**
94
     * Performs a check on the socket connection and returns either the contents or an empty object
95
     *
96
     * @param ClientInterface $client
97
     * @return ReadableContentInterface
98
     * @throws \DomainException
99
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
100
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
101
     */
102
    public function checkForEvent(ClientInterface $client): ReadableContentInterface
103
    {
104
        $publishPacketControlField = $client->readSocketData(1);
105
        $eventManager = new EventManager($this->logger);
106
107
        if ((\ord($publishPacketControlField) & 255) > 0) {
108
            return $eventManager->analyzeHeaders($publishPacketControlField, $client);
109
        }
110
111
        $this->logger->debug('No valid publish packet control field found, returning empty response');
112
        return new EmptyReadableResponse($this->logger);
113
    }
114
115
    /**
116
     * Performs a simple loop and yields results back whenever they are available
117
     *
118
     * @param ClientInterface $client
119
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
120
     * @return \Generator
121
     * @throws \DomainException
122
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
123
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
124
     */
125
    public function loop(ClientInterface $client, int $idleMicroseconds = 100000): \Generator
126
    {
127
        // First of all: subscribe
128
        $client->setBlocking(true);
129
        $readableContent = $client->sendData($this);
130
        /**
131
         * The Server is permitted to start sending PUBLISH packets matching the Subscription before it sends the
132
         * SUBACK Packet.
133
         */
134
        if ($readableContent instanceof Publish) {
135
            yield $readableContent->getMessage();
136
        }
137
138
        // After we are successfully subscribed, start to listen for events
139
        while (true) {
140
            $readableContent = $this->checkForEvent($client);
141
142
            // Only if we receive a Publish event from the broker, yield the contents
143
            if ($readableContent instanceof Publish) {
144
                yield $readableContent->getMessage();
145
            } else {
146
                // Only wait for a certain amount of time if there was nothing in the queue
147
                usleep($idleMicroseconds);
148
            }
149
        }
150
    }
151
152
    /**
153
     * A subscription is based on filters, this function allows us to pass on filters
154
     *
155
     * @param Topic[] $topics
156
     * @return Subscribe
157
     */
158
    public function addTopics(Topic ...$topics): Subscribe
159
    {
160
        $this->topics = $topics;
161
        $this->logger->debug('One or more topics have been added', ['totalTopics', count($this->topics)]);
162
163
        return $this;
164
    }
165
}
166