Passed
Push — master ( 56612a...1b3824 )
by Camilo
09:25
created

UnSubscribe::loop()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 21
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 21
rs 9.3142
c 0
b 0
f 0
cc 3
eloc 12
nc 3
nop 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\Internals\ClientInterface;
9
use unreal4u\MQTT\Internals\EventManager;
10
use unreal4u\MQTT\Internals\ProtocolBase;
11
use unreal4u\MQTT\Internals\ReadableContentInterface;
12
use unreal4u\MQTT\Internals\WritableContent;
13
use unreal4u\MQTT\Internals\WritableContentInterface;
14
use unreal4u\MQTT\Protocol\Subscribe\Topic;
15
use unreal4u\MQTT\Utilities;
16
17
final class UnSubscribe extends ProtocolBase implements WritableContentInterface
18
{
19
    use WritableContent;
20
21
    const CONTROL_PACKET_VALUE = 10;
22
23
    private $packetIdentifier = 0;
24
25
    /**
26
     * An array of topics on which to subscribe to
27
     * @var Topic[]
28
     */
29
    private $topics = [];
30
31
    /**
32
     * @return string
33
     * @throws \OutOfRangeException
34
     * @throws \Exception
35
     */
36
    public function createVariableHeader(): string
37
    {
38
        // Subscribe must always send a 2 flag
39
        $this->specialFlags = 2;
40
41
        // Assign a packet identifier automatically if none has been assigned yet
42
        if ($this->packetIdentifier === 0) {
43
            $this->setPacketIdentifier(random_int(0, 65535));
44
        }
45
46
        return Utilities::convertNumberToBinaryString($this->packetIdentifier);
47
    }
48
49
    public function createPayload(): string
50
    {
51
        $output = '';
52
        foreach ($this->topics as $topic) {
53
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
54
            $output .= $this->createUTF8String($topic->getTopicName()) . \chr($topic->getTopicQoSLevel());
55
        }
56
        return $output;
57
    }
58
59
    /**
60
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
61
     *
62
     * This can however not be in the same order, as we may be able to receive PUBLISH packets before getting a SUBACK
63
     * back
64
     *
65
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
66
     * @return bool
67
     */
68
    public function shouldExpectAnswer(): bool
69
    {
70
        return true;
71
    }
72
73
    /**
74
     * SUBSCRIBE Control Packets MUST contain a non-zero 16-bit Packet Identifier
75
     *
76
     * @param int $packetIdentifier
77
     * @return Subscribe
78
     * @throws \OutOfRangeException
79
     */
80
    public function setPacketIdentifier(int $packetIdentifier): Subscribe
81
    {
82
        if ($packetIdentifier > 65535 || $packetIdentifier < 1) {
83
            throw new \OutOfRangeException('Packet identifier must fit within 2 bytes');
84
        }
85
86
        $this->packetIdentifier = $packetIdentifier;
87
        $this->logger->debug('Setting packet identifier', ['current' => $this->packetIdentifier]);
88
89
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type unreal4u\MQTT\Protocol\UnSubscribe which is incompatible with the type-hinted return unreal4u\MQTT\Protocol\Subscribe.
Loading history...
90
    }
91
92
    public function getPacketIdentifier(): int
93
    {
94
        return $this->packetIdentifier;
95
    }
96
97
    /**
98
     * Performs a check on the socket connection and returns either the contents or an empty object
99
     *
100
     * @param ClientInterface $client
101
     * @return ReadableContentInterface
102
     * @throws \DomainException
103
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
104
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
105
     */
106
    public function checkForEvent(ClientInterface $client): ReadableContentInterface
107
    {
108
        $this->checkPingTime($client);
109
        $publishPacketControlField = $client->readSocketData(1);
110
        $eventManager = new EventManager($this->logger);
111
112
        $this->logger->debug('Checking event', ['ordValue' => \ord($publishPacketControlField) & 255]);
113
        if ((\ord($publishPacketControlField) & 255) > 0) {
114
            return $eventManager->analyzeHeaders($publishPacketControlField, $client);
115
        }
116
117
        $this->logger->debug('No valid publish packet control field found, returning empty response');
118
        return new EmptyReadableResponse($this->logger);
119
    }
120
121
    /**
122
     * Loop and yields different type of results back whenever they are available
123
     *
124
     * @param ClientInterface $client
125
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
126
     * @return \Generator
127
     * @throws \DomainException
128
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
129
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
130
     */
131
    public function loop(ClientInterface $client, int $idleMicroseconds = 100000): \Generator
132
    {
133
        // First of all: subscribe
134
        $this->logger->debug('Beginning loop', ['idleMicroseconds' => $idleMicroseconds]);
135
        $client->setBlocking(true);
136
        $readableContent = $client->sendData($this);
137
        // Set blocking explicitly to false due to messages not always arriving in the correct order
138
        $client->setBlocking(false);
139
140
        while (true) {
141
            $this->logger->debug('--- Loop ---');
142
            // Only if we receive a Publish event from the broker, yield the contents
143
            if ($readableContent instanceof Publish) {
144
                yield $readableContent->getMessage();
145
            } else {
146
                // Only wait for a certain amount of time if there was nothing in the queue
147
                $this->logger->info('Got an incoming object, disregarding', ['class' => \get_class($readableContent)]);
148
                usleep($idleMicroseconds);
149
            }
150
151
            $readableContent = $this->checkForEvent($client);
152
        }
153
    }
154
155
    /**
156
     * A subscription is based on filters, this function allows us to pass on filters
157
     *
158
     * @param Topic[] $topics
159
     * @return Subscribe
160
     */
161
    public function addTopics(Topic ...$topics): Subscribe
162
    {
163
        $this->topics = $topics;
164
        $this->logger->debug('Topics added', ['totalTopics', count($this->topics)]);
165
166
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type unreal4u\MQTT\Protocol\UnSubscribe which is incompatible with the type-hinted return unreal4u\MQTT\Protocol\Subscribe.
Loading history...
167
    }
168
169
    /**
170
     * @param ClientInterface $client
171
     * @return bool
172
     */
173
    protected function checkPingTime(ClientInterface $client): bool
174
    {
175
        $this->logger->debug('Checking ping request time');
176
        if ($client->isItPingTime()) {
177
            $this->logger->notice('PingReq is needed, sending');
178
            $client->setBlocking(true);
179
            $client->sendData(new PingReq($this->logger));
180
            $client->setBlocking(false);
181
        }
182
183
        return true;
184
    }
185
}
186