Completed
Pull Request — 1.1 (#12)
by David
02:00
created

Client::setPrefetchSize()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
1
<?php
2
3
namespace Mouf\AmqpClient;
4
5
use PhpAmqpLib\Connection\AMQPSocketConnection;
6
use PhpAmqpLib\Connection\AMQPStreamConnection;
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use Mouf\AmqpClient\Exception\ConnectionException;
9
10
class Client
11
{
12
    /**
13
     * RabbitMq host.
14
     *
15
     * @var string
16
     */
17
    private $host;
18
19
    /**
20
     * RabbitMq port.
21
     *
22
     * @var string
23
     */
24
    private $port;
25
26
    /**
27
     * RabbitMq user.
28
     *
29
     * @var string
30
     */
31
    private $user;
32
33
    /**
34
     * RabbitMq password.
35
     *
36
     * @var string
37
     */
38
    private $password;
39
40
    /**
41
     * It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
42
     *
43
     * @var int
44
     */
45
    private $prefetchSize = null;
46
47
    /**
48
     * It's for QOS prefetch-count http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
49
     *
50
     * @var int
51
     */
52
    private $prefetchCount = null;
53
54
    /**
55
     * It's for QOS global http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
56
     *
57
     * @var int
58
     */
59
    private $aGlobal = null;
60
61
    /**
62
     * RabbitMq connection.
63
     *
64
     * @var AMQPStreamConnection
65
     */
66
    private $connection = null;
67
68
    /**
69
     * RabbitMq channel.
70
     *
71
     * @var \AMQPChannel
72
     */
73
    private $channel = null;
74
75
    /**
76
     * List of RabbitMq object.
77
     *
78
     * @var RabbitMqObjectInterface[]
79
     */
80
    private $rabbitMqObjects = [];
81
82
    public function __construct($host, $port, $user, $password)
83
    {
84
        $this->host = $host;
85
        $this->port = ($port !== null) ? $port : 5672;
86
        $this->user = $user;
87
        $this->password = $password;
88
    }
89
90
    /**
91
     * Get prefetch size for QOS.
92
     */
93
    public function getPrefetchSize()
94
    {
95
        return $this->prefetchSize;
96
    }
97
98
    /**
99
     * Set prefetch size
100
     * It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
101
     *
102
     * @param int $prefetchSize
103
     */
104
    public function setPrefetchSize($prefetchSize)
105
    {
106
        $this->prefetchSize = $prefetchSize;
107
108
        return $this;
109
    }
110
111
    /**
112
     * Get prefetch count for QOS.
113
     */
114
    public function getPrefetchCount()
115
    {
116
        return $this->prefetchCount;
117
    }
118
119
    /**
120
     * Set prefetch size
121
     * It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
122
     *
123
     * @param int $prefetchCount
124
     */
125
    public function setPrefetchCount($prefetchCount)
126
    {
127
        $this->prefetchCount = $prefetchCount;
128
129
        return $this;
130
    }
131
132
    /**
133
     * Get a global for QOS.
134
     */
135
    public function getAGlobal()
136
    {
137
        return $this->aGlobal;
138
    }
139
140
    /**
141
     * Set global
142
     * It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.
143
     *
144
     * @param int $aGlobal
145
     */
146
    public function setAGlobal($aGlobal)
147
    {
148
        $this->aGlobal = $aGlobal;
149
150
        return $this;
151
    }
152
153
    /**
154
     * Set RabbitMq object.
155
     *
156
     * @param RabbitMqObjectInterface[] $rabbitMqObjects
157
     */
158
    public function setRabbitMqObjects(array $rabbitMqObjects)
159
    {
160
        $this->rabbitMqObjects = $rabbitMqObjects;
161
    }
162
163
    public function register(RabbitMqObjectInterface $object)
164
    {
165
        if (!in_array($object, $this->rabbitMqObjects, true)) {
166
            $this->rabbitMqObjects[] = $object;
167
        }
168
    }
169
170
    /**
171
     * Connection to the RabbitMq service with AMQPStreamConnection.
172
     *
173
     * @return AMQPChannel
174
     */
175
    public function getChannel()
176
    {
177
        if (!$this->connection) {
178
            try {
179
                if (function_exists('socket_create')) {
180
                    $this->connection = new AMQPSocketConnection($this->host, $this->port, $this->user, $this->password);
0 ignored issues
show
Documentation Bug introduced by
It seems like new \PhpAmqpLib\Connecti...>user, $this->password) of type object<PhpAmqpLib\Connec...n\AMQPSocketConnection> is incompatible with the declared type object<PhpAmqpLib\Connec...n\AMQPStreamConnection> of property $connection.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
181
                } else {
182
                    $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
183
                }
184
            } catch (\ErrorException $e) {
0 ignored issues
show
Bug introduced by
The class ErrorException does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
185
                /* We are trying to catch the exception when the connection if refused */
186
                if (preg_match("/.*unable to connect.*Connection refused.*/", $e->__toString())) {
187
                    throw new ConnectionException("Cannot create the connection", 404, $e);
188
                }
189
                throw $e;
190
            } catch (\AMQPIOException $e) {
0 ignored issues
show
Bug introduced by
The class AMQPIOException does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
191
                throw new ConnectionException("Cannot create the connection", 404, $e);
192
            }
193
            $this->channel = $this->connection->channel();
194
195
            if ($this->prefetchSize !== null || $this->prefetchCount !== null || $this->aGlobal !== null) {
196
                $this->channel->basic_qos($this->prefetchSize, $this->prefetchCount, $this->aGlobal);
197
            }
198
199
            foreach ($this->rabbitMqObjects as $rabbitMqObject) {
200
                $rabbitMqObject->init($this->channel);
201
            }
202
        }
203
204
        return $this->channel;
205
    }
206
207
    /**
208
     * Returns the list of registered queues.
209
     *
210
     * @return QueueInterface[]
211
     */
212
    public function getQueues()
213
    {
214
        return array_filter($this->rabbitMqObjects, function (RabbitMqObjectInterface $object) {
215
            return $object instanceof QueueInterface;
216
        });
217
    }
218
}
219