Completed
Push — master ( 1d7f97...fc052b )
by Camilo
01:42
created

Subscribe   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 139
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
wmc 13
dl 0
loc 139
ccs 0
cts 46
cp 0
rs 10
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
A createPayload() 0 9 2
A checkForEvent() 0 16 2
A expectAnswer() 0 7 1
A updateCommunication() 0 11 2
A addTopics() 0 5 1
A createVariableHeader() 0 5 1
A loop() 0 15 3
A shouldExpectAnswer() 0 3 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace unreal4u\MQTT\Protocol;
6
7
use unreal4u\MQTT\Application\EmptyReadableResponse;
8
use unreal4u\MQTT\Application\PayloadInterface;
9
use unreal4u\MQTT\Client;
10
use unreal4u\MQTT\Internals\ProtocolBase;
11
use unreal4u\MQTT\Internals\ReadableContentInterface;
12
use unreal4u\MQTT\Internals\WritableContent;
13
use unreal4u\MQTT\Internals\WritableContentInterface;
14
use unreal4u\MQTT\Protocol\Subscribe\Topic;
15
16
final class Subscribe extends ProtocolBase implements WritableContentInterface
17
{
18
    use WritableContent;
19
20
    const CONTROL_PACKET_VALUE = 8;
21
22
    public $packetIdentifier = 0;
23
24
    /**
25
     * An array of topics on which to subscribe to
26
     * @var Topic[]
27
     */
28
    private $topics = [];
29
30
    public function createVariableHeader(): string
31
    {
32
        // Subscribe must always send a 2 flag
33
        $this->specialFlags = 2;
34
        return \chr(0) . \chr($this->packetIdentifier);
35
    }
36
37
    public function createPayload(): string
38
    {
39
40
        $output = '';
41
        foreach ($this->topics as $topic) {
42
            // chr on QoS level is safe because it will create an 8-bit flag where the first 6 are only 0's
43
            $output .= $this->createUTF8String($topic->getTopicName()) . \chr($topic->getTopicQoSLevel());
44
        }
45
        return $output;
46
    }
47
48
    /**
49
     * QoS level 0 does not have to wait for a answer, so return false. Any other QoS level returns true
50
     * @return bool
51
     */
52
    public function shouldExpectAnswer(): bool
53
    {
54
        return true;
55
    }
56
57
    public function expectAnswer(string $data): ReadableContentInterface
58
    {
59
        $this->logger->info('String of incoming data confirmed, returning new object', ['class' => \get_class($this)]);
60
        $subAck = new SubAck($this->logger);
61
        $subAck->populate($data);
62
63
        return $subAck;
64
    }
65
66
    /**
67
     * Performs a check on the socket connection and returns either the contents or an empty object
68
     *
69
     * @param Client $client
70
     * @param PayloadInterface $payloadType
71
     * @return ReadableContentInterface
72
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
73
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
74
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
75
     */
76
    public function checkForEvent(Client $client, PayloadInterface $payloadType): ReadableContentInterface
77
    {
78
        $this->updateCommunication($client);
79
        $publishPacketControlField = $client->readSocketData(1);
80
        if ((\ord($publishPacketControlField) & 0xf0) > 0) {
81
            $restOfBytes = $client->readSocketData(1);
82
            $payload = $client->readSocketData(\ord($restOfBytes));
83
84
            $publish = new Publish($this->logger);
85
            $publish->setPayloadType($payloadType);
86
            $publish->populate($publishPacketControlField . $restOfBytes . $payload);
87
            return $publish;
88
        }
89
90
        $this->logger->debug('No valid publish packet control field found, returning empty response');
91
        return new EmptyReadableResponse($this->logger);
92
    }
93
94
    /**
95
     * Performs a simple loop and yields results back whenever they are available
96
     *
97
     * @param Client $client
98
     * @param PayloadInterface $payloadObject
99
     * @param int $idleMicroseconds
100
     * @return \Generator
101
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
102
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
103
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
104
     */
105
    public function loop(Client $client, PayloadInterface $payloadObject, int $idleMicroseconds = 100000): \Generator
106
    {
107
        // First of all: subscribe
108
        $client->sendData($this);
109
110
        // After we are successfully subscribed, start to listen for events
111
        while (true) {
112
            $readableContent = $this->checkForEvent($client, $payloadObject);
113
114
            // Only if we receive a Publish event from the broker, yield the contents
115
            if ($readableContent instanceof Publish) {
116
                yield $readableContent->getMessage();
117
            } else {
118
                // Only wait for a certain amount of time if there was nothing in the queue
119
                usleep($idleMicroseconds);
120
            }
121
        }
122
    }
123
124
    /**
125
     * A subscription is based on filters, this function allows us to pass on filters
126
     *
127
     * @param Topic[] $topics
128
     * @return Subscribe
129
     */
130
    public function addTopics(Topic ...$topics): Subscribe
131
    {
132
        $this->topics = $topics;
133
134
        return $this;
135
    }
136
137
    /**
138
     * @param Client $client
139
     * @return bool
140
     * @throws \unreal4u\MQTT\Exceptions\NotConnected
141
     * @throws \unreal4u\MQTT\Exceptions\Connect\NoConnectionParametersDefined
142
     * @throws \unreal4u\MQTT\Exceptions\ServerClosedConnection
143
     */
144
    private function updateCommunication(Client $client): bool
145
    {
146
        $this->logger->debug('Checking ping');
147
        if ($client->needsCommunication()) {
148
            $this->logger->notice('Sending ping');
149
            $client->setBlocking(true);
150
            $client->sendData(new PingReq($this->logger));
151
            $client->setBlocking(false);
152
        }
153
154
        return true;
155
    }
156
}
157