Completed
Pull Request — 1.0 (#4)
by Benoit
05:42
created

Client::createConnection()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 13
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 9
nc 3
nop 0
1
<?php
2
3
namespace Mouf\AmqpClient;
4
5
use PhpAmqpLib\Connection\AMQPStreamConnection;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
8
class Client
9
{
10
    /**
11
     * RabbitMq host.
12
     *
13
     * @var string
14
     */
15
    private $host;
16
17
    /**
18
     * RabbitMq port.
19
     *
20
     * @var string
21
     */
22
    private $port;
23
24
    /**
25
     * RabbitMq user.
26
     *
27
     * @var string
28
     */
29
    private $user;
30
31
    /**
32
     * RabbitMq password.
33
     *
34
     * @var string
35
     */
36
    private $password;
37
38
    /**
39
     * It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
40
     *
41
     * @var int
42
     */
43
    private $prefetchSize = null;
44
45
    /**
46
     * It's for QOS prefetch-count http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
47
     *
48
     * @var int
49
     */
50
    private $prefetchCount = null;
51
52
    /**
53
     * It's for QOS global http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
54
     *
55
     * @var int
56
     */
57
    private $aGlobal = null;
58
59
    /**
60
     * RabbitMq connection.
61
     *
62
     * @var AMQPStreamConnection
63
     */
64
    private $connection = null;
65
66
    /**
67
     * RabbitMq channel.
68
     *
69
     * @var \AMQPChannel
70
     */
71
    private $channel = null;
72
73
    /**
74
     * List of RabbitMq object.
75
     *
76
     * @var RabbitMqObjectInterface[]
77
     */
78
    private $rabbitMqObjects = [];
79
80
    /**
81
    * Maximum connection's retry
82
    **/
83
    const MAX_RETRY = 10;
84
85
    /**
86
    * current retry amount
87
    **/
88
    private static $retry = 0;
89
    /**
90
    * time in seconds to wait between each retry
91
    **/
92
    const WAIT_TIME = 30;
93
94
    public function __construct($host, $port, $user, $password)
95
    {
96
        $this->host = $host;
97
        $this->port = ($port !== null) ? $port : 5672;
98
        $this->user = $user;
99
        $this->password = $password;
100
    }
101
102
    /**
103
     * Get prefetch size for QOS.
104
     */
105
    public function getPrefetchSize()
106
    {
107
        return $this->prefetchSize;
108
    }
109
110
    /**
111
     * Set prefetch size
112
     * It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
113
     *
114
     * @param int $prefetchSize
115
     */
116
    public function setPrefetchSize($prefetchSize)
117
    {
118
        $this->prefetchSize = $prefetchSize;
119
120
        return $this;
121
    }
122
123
    /**
124
     * Get prefetch count for QOS.
125
     */
126
    public function getPrefetchCount()
127
    {
128
        return $this->prefetchCount;
129
    }
130
131
    /**
132
     * Set prefetch size
133
     * It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
134
     *
135
     * @param int $prefetchCount
136
     */
137
    public function setPrefetchCount($prefetchCount)
138
    {
139
        $this->prefetchCount = $prefetchCount;
140
141
        return $this;
142
    }
143
144
    /**
145
     * Get a global for QOS.
146
     */
147
    public function getAGlobal()
148
    {
149
        return $this->aGlobal;
150
    }
151
152
    /**
153
     * Set global
154
     * It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
155
     *
156
     * @param int $aGlobal
157
     */
158
    public function setAGlobal($aGlobal)
159
    {
160
        $this->aGlobal = $aGlobal;
161
162
        return $this;
163
    }
164
165
    /**
166
     * Set RabbitMq object.
167
     *
168
     * @param RabbitMqObjectInterface[] $rabbitMqObjects
169
     */
170
    public function setRabbitMqObjects(array $rabbitMqObjects)
171
    {
172
        $this->rabbitMqObjects = $rabbitMqObjects;
173
    }
174
175
    public function register(RabbitMqObjectInterface $object)
176
    {
177
        if (!in_array($object, $this->rabbitMqObjects, true)) {
178
            $this->rabbitMqObjects[] = $object;
179
        }
180
    }
181
182
    /**
183
    * Function used to create $this->connection
184
    * handle connection's retry in case of the rabbitmq server is not ready yet
185
    **/
186
    private function createConnection()
187
    {
188
        try {
189
            $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
190
        } catch (\ErrorException $e) {
191
            if (self::$retry >= self::MAX_RETRY) {
192
                throw $e;
193
            }
194
            sleep(self::WAIT_TIME);
195
            $this->createConnection();
196
            self::$retry++;
197
        }
198
    }
199
200
    /**
201
     * Connection to the RabbitMq service with AMQPStreamConnection.
202
     *
203
     * @return AMQPChannel
204
     */
205
    public function getChannel()
206
    {
207
        if (!$this->connection) {
208
            $this->createConnection();
209
            $this->channel = $this->connection->channel();
210
211
            if ($this->prefetchSize !== null || $this->prefetchCount !== null || $this->aGlobal !== null) {
212
                $this->channel->basic_qos($this->prefetchSize, $this->prefetchCount, $this->aGlobal);
213
            }
214
215
            foreach ($this->rabbitMqObjects as $rabbitMqObject) {
216
                $rabbitMqObject->init($this->channel);
217
            }
218
        }
219
220
        return $this->channel;
221
    }
222
223
    /**
224
     * Returns the list of registered queues.
225
     *
226
     * @return QueueInterface[]
227
     */
228
    public function getQueues()
229
    {
230
        return array_filter($this->rabbitMqObjects, function (RabbitMqObjectInterface $object) {
231
            return $object instanceof QueueInterface;
232
        });
233
    }
234
}
235