Passed
Push — master ( fc052b...13a97b )
by Camilo
02:52
created

Subscribe::getPacketIdentifier()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 3
ccs 0
cts 2
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 1
nc 1
nop 0
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\Application\PayloadInterface;
9
use unreal4u\MQTT\Application\SimplePayload;
10
use unreal4u\MQTT\Client;
11
use unreal4u\MQTT\Internals\ClientInterface;
12
use unreal4u\MQTT\Internals\ProtocolBase;
13
use unreal4u\MQTT\Internals\ReadableContentInterface;
14
use unreal4u\MQTT\Internals\WritableContent;
15
use unreal4u\MQTT\Internals\WritableContentInterface;
16
use unreal4u\MQTT\Protocol\Subscribe\Topic;
17
use unreal4u\MQTT\Utilities;
18
19
final class Subscribe extends ProtocolBase implements WritableContentInterface
20
{
21
    use WritableContent;
22
23
    const CONTROL_PACKET_VALUE = 8;
24
25
    private $packetIdentifier = 0;
26
27
    /**
28
     * An array of topics on which to subscribe to
29
     * @var Topic[]
30
     */
31
    private $topics = [];
32
33
    /**
34
     * @return string
35
     * @throws \OutOfRangeException
36
     * @throws \Exception
37
     */
38
    public function createVariableHeader(): string
39
    {
40
        // Subscribe must always send a 2 flag
41
        $this->specialFlags = 2;
42
43
        // Assign a packet identifier automatically if none has been assigned yet
44
        if ($this->packetIdentifier === 0) {
45
            $this->setPacketIdentifier(random_int(0, 65535));
46
        }
47
48
        return Utilities::convertNumberToBinaryString($this->packetIdentifier);
49
    }
50
51
    public function createPayload(): string
52
    {
53
        $output = '';
54
        foreach ($this->topics as $topic) {
55
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
56
            $output .= $this->createUTF8String($topic->getTopicName()) . \chr($topic->getTopicQoSLevel());
57
        }
58
        return $output;
59
    }
60
61
    /**
62
     * When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond with a SUBACK Packet
63
     *
64
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718134 (MQTT-3.8.4-1)
65
     * @return bool
66
     */
67
    public function shouldExpectAnswer(): bool
68
    {
69
        return true;
70
    }
71
72
    public function expectAnswer(string $data, ClientInterface $client): ReadableContentInterface
73
    {
74
        $this->logger->info('String of incoming data confirmed, returning new object', ['class' => \get_class($this)]);
75
76
        if (\ord($data[0]) >> 4 === 9) {
77
            $returnObject = new SubAck($this->logger);
78
        } elseif (\ord($data[0]) >> 4 === 3) {
79
            $returnObject = new Publish($this->logger);
80
            $returnObject->setPayloadType(new SimplePayload());
81
        }
82
        $returnObject->populate($data);
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $returnObject does not seem to be defined for all execution paths leading up to this point.
Loading history...
83
84
        return $returnObject;
85
    }
86
87
    /**
88
     * SUBSCRIBE Control Packets MUST contain a non-zero 16-bit Packet Identifier
89
     *
90
     * @param int $packetIdentifier
91
     * @return Subscribe
92
     * @throws \OutOfRangeException
93
     */
94
    public function setPacketIdentifier(int $packetIdentifier): Subscribe
95
    {
96
        if ($packetIdentifier > 65535 || $packetIdentifier < 1) {
97
            throw new \OutOfRangeException('Packet identifier must fit within 2 bytes');
98
        }
99
100
        $this->packetIdentifier = $packetIdentifier;
101
102
        return $this;
103
    }
104
105
    public function getPacketIdentifier(): int
106
    {
107
        return $this->packetIdentifier;
108
    }
109
110
    /**
111
     * Performs a check on the socket connection and returns either the contents or an empty object
112
     *
113
     * @param Client $client
114
     * @param PayloadInterface $payloadType
115
     * @return ReadableContentInterface
116
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
117
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
118
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
119
     */
120
    public function checkForEvent(Client $client, PayloadInterface $payloadType): ReadableContentInterface
121
    {
122
        $this->updateCommunication($client);
123
        $publishPacketControlField = $client->readSocketData(1);
124
        if ((\ord($publishPacketControlField) & 0xf0) > 0) {
125
            $restOfBytes = $client->readSocketData(1);
126
            $payload = $client->readSocketData(\ord($restOfBytes));
127
128
            $publish = new Publish($this->logger);
129
            $publish->setPayloadType($payloadType);
130
            $publish->populate($publishPacketControlField . $restOfBytes . $payload);
131
            return $publish;
132
        }
133
134
        $this->logger->debug('No valid publish packet control field found, returning empty response');
135
        return new EmptyReadableResponse($this->logger);
136
    }
137
138
    /**
139
     * Performs a simple loop and yields results back whenever they are available
140
     *
141
     * @param Client $client
142
     * @param PayloadInterface $payloadObject
143
     * @param int $idleMicroseconds The amount of microseconds the watcher should wait before checking the socket again
144
     * @return \Generator
145
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
146
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
147
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
148
     */
149
    public function loop(Client $client, PayloadInterface $payloadObject, int $idleMicroseconds = 100000): \Generator
150
    {
151
        // First of all: subscribe
152
        // FIXME: The Server is permitted to start sending PUBLISH packets matching the Subscription before it sends the SUBACK Packet.
153
        $client->sendData($this);
154
155
        // After we are successfully subscribed, start to listen for events
156
        while (true) {
157
            $readableContent = $this->checkForEvent($client, $payloadObject);
158
159
            // Only if we receive a Publish event from the broker, yield the contents
160
            if ($readableContent instanceof Publish) {
161
                yield $readableContent->getMessage();
162
            } else {
163
                // Only wait for a certain amount of time if there was nothing in the queue
164
                usleep($idleMicroseconds);
165
            }
166
        }
167
    }
168
169
    /**
170
     * A subscription is based on filters, this function allows us to pass on filters
171
     *
172
     * @param Topic[] $topics
173
     * @return Subscribe
174
     */
175
    public function addTopics(Topic ...$topics): Subscribe
176
    {
177
        $this->topics = $topics;
178
        $this->logger->debug('One or more topics have been added', ['totalTopics', count($this->topics)]);
179
180
        return $this;
181
    }
182
183
    /**
184
     * @param Client $client
185
     * @return bool
186
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
187
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
188
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
189
     */
190
    private function updateCommunication(Client $client): bool
191
    {
192
        $this->logger->debug('Checking ping');
193
        if ($client->isItPingTime()) {
194
            $this->logger->notice('Sending ping');
195
            $client->setBlocking(true);
196
            $client->sendData(new PingReq($this->logger));
197
            $client->setBlocking(false);
198
        }
199
200
        return true;
201
    }
202
}
203