1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Mouf\AmqpClient; |
4
|
|
|
|
5
|
|
|
use PhpAmqpLib\Connection\AMQPStreamConnection; |
6
|
|
|
use PhpAmqpLib\Channel\AMQPChannel; |
7
|
|
|
use Mouf\AmqpClient\Exception\ConnectionException; |
8
|
|
|
|
9
|
|
|
class Client |
10
|
|
|
{ |
11
|
|
|
/** |
12
|
|
|
* RabbitMq host. |
13
|
|
|
* |
14
|
|
|
* @var string |
15
|
|
|
*/ |
16
|
|
|
private $host; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* RabbitMq port. |
20
|
|
|
* |
21
|
|
|
* @var string |
22
|
|
|
*/ |
23
|
|
|
private $port; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* RabbitMq user. |
27
|
|
|
* |
28
|
|
|
* @var string |
29
|
|
|
*/ |
30
|
|
|
private $user; |
31
|
|
|
|
32
|
|
|
/** |
33
|
|
|
* RabbitMq password. |
34
|
|
|
* |
35
|
|
|
* @var string |
36
|
|
|
*/ |
37
|
|
|
private $password; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos. |
41
|
|
|
* |
42
|
|
|
* @var int |
43
|
|
|
*/ |
44
|
|
|
private $prefetchSize = null; |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* It's for QOS prefetch-count http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos. |
48
|
|
|
* |
49
|
|
|
* @var int |
50
|
|
|
*/ |
51
|
|
|
private $prefetchCount = null; |
52
|
|
|
|
53
|
|
|
/** |
54
|
|
|
* It's for QOS global http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos. |
55
|
|
|
* |
56
|
|
|
* @var int |
57
|
|
|
*/ |
58
|
|
|
private $aGlobal = null; |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* RabbitMq connection. |
62
|
|
|
* |
63
|
|
|
* @var AMQPStreamConnection |
64
|
|
|
*/ |
65
|
|
|
private $connection = null; |
66
|
|
|
|
67
|
|
|
/** |
68
|
|
|
* RabbitMq channel. |
69
|
|
|
* |
70
|
|
|
* @var \AMQPChannel |
71
|
|
|
*/ |
72
|
|
|
private $channel = null; |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* List of RabbitMq object. |
76
|
|
|
* |
77
|
|
|
* @var RabbitMqObjectInterface[] |
78
|
|
|
*/ |
79
|
|
|
private $rabbitMqObjects = []; |
80
|
|
|
|
81
|
|
|
public function __construct($host, $port, $user, $password) |
82
|
|
|
{ |
83
|
|
|
$this->host = $host; |
84
|
|
|
$this->port = ($port !== null) ? $port : 5672; |
85
|
|
|
$this->user = $user; |
86
|
|
|
$this->password = $password; |
87
|
|
|
} |
88
|
|
|
|
89
|
|
|
/** |
90
|
|
|
* Get prefetch size for QOS. |
91
|
|
|
*/ |
92
|
|
|
public function getPrefetchSize() |
93
|
|
|
{ |
94
|
|
|
return $this->prefetchSize; |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
/** |
98
|
|
|
* Set prefetch size |
99
|
|
|
* It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos. |
100
|
|
|
* |
101
|
|
|
* @param int $prefetchSize |
102
|
|
|
*/ |
103
|
|
|
public function setPrefetchSize($prefetchSize) |
104
|
|
|
{ |
105
|
|
|
$this->prefetchSize = $prefetchSize; |
106
|
|
|
|
107
|
|
|
return $this; |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
/** |
111
|
|
|
* Get prefetch count for QOS. |
112
|
|
|
*/ |
113
|
|
|
public function getPrefetchCount() |
114
|
|
|
{ |
115
|
|
|
return $this->prefetchCount; |
116
|
|
|
} |
117
|
|
|
|
118
|
|
|
/** |
119
|
|
|
* Set prefetch size |
120
|
|
|
* It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos. |
121
|
|
|
* |
122
|
|
|
* @param int $prefetchCount |
123
|
|
|
*/ |
124
|
|
|
public function setPrefetchCount($prefetchCount) |
125
|
|
|
{ |
126
|
|
|
$this->prefetchCount = $prefetchCount; |
127
|
|
|
|
128
|
|
|
return $this; |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
/** |
132
|
|
|
* Get a global for QOS. |
133
|
|
|
*/ |
134
|
|
|
public function getAGlobal() |
135
|
|
|
{ |
136
|
|
|
return $this->aGlobal; |
137
|
|
|
} |
138
|
|
|
|
139
|
|
|
/** |
140
|
|
|
* Set global |
141
|
|
|
* It's for QOS prefetch-size http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos. |
142
|
|
|
* |
143
|
|
|
* @param int $aGlobal |
144
|
|
|
*/ |
145
|
|
|
public function setAGlobal($aGlobal) |
146
|
|
|
{ |
147
|
|
|
$this->aGlobal = $aGlobal; |
148
|
|
|
|
149
|
|
|
return $this; |
150
|
|
|
} |
151
|
|
|
|
152
|
|
|
/** |
153
|
|
|
* Set RabbitMq object. |
154
|
|
|
* |
155
|
|
|
* @param RabbitMqObjectInterface[] $rabbitMqObjects |
156
|
|
|
*/ |
157
|
|
|
public function setRabbitMqObjects(array $rabbitMqObjects) |
158
|
|
|
{ |
159
|
|
|
$this->rabbitMqObjects = $rabbitMqObjects; |
160
|
|
|
} |
161
|
|
|
|
162
|
|
|
public function register(RabbitMqObjectInterface $object) |
163
|
|
|
{ |
164
|
|
|
if (!in_array($object, $this->rabbitMqObjects, true)) { |
165
|
|
|
$this->rabbitMqObjects[] = $object; |
166
|
|
|
} |
167
|
|
|
} |
168
|
|
|
|
169
|
|
|
/** |
170
|
|
|
* Connection to the RabbitMq service with AMQPStreamConnection. |
171
|
|
|
* |
172
|
|
|
* @return AMQPChannel |
173
|
|
|
*/ |
174
|
|
|
public function getChannel() |
175
|
|
|
{ |
176
|
|
|
if (!$this->connection) { |
177
|
|
|
try { |
178
|
|
|
$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); |
179
|
|
|
} catch (\ErrorException $e) { |
|
|
|
|
180
|
|
|
/* We are trying to catch the exception when the connection if refused */ |
181
|
|
|
if (preg_match("/.*unable to connect.*Connection refused.*/", $e->__toString())) { |
182
|
|
|
throw new ConnectionException("Cannot create the connection", 404, $e); |
183
|
|
|
} |
184
|
|
|
throw $e; |
185
|
|
|
} |
186
|
|
|
$this->channel = $this->connection->channel(); |
|
|
|
|
187
|
|
|
|
188
|
|
|
if ($this->prefetchSize !== null || $this->prefetchCount !== null || $this->aGlobal !== null) { |
189
|
|
|
$this->channel->basic_qos($this->prefetchSize, $this->prefetchCount, $this->aGlobal); |
|
|
|
|
190
|
|
|
} |
191
|
|
|
|
192
|
|
|
foreach ($this->rabbitMqObjects as $rabbitMqObject) { |
193
|
|
|
$rabbitMqObject->init($this->channel); |
194
|
|
|
} |
195
|
|
|
} |
196
|
|
|
|
197
|
|
|
return $this->channel; |
|
|
|
|
198
|
|
|
} |
199
|
|
|
|
200
|
|
|
/** |
201
|
|
|
* Returns the list of registered queues. |
202
|
|
|
* |
203
|
|
|
* @return QueueInterface[] |
204
|
|
|
*/ |
205
|
|
|
public function getQueues() |
206
|
|
|
{ |
207
|
|
|
return array_filter($this->rabbitMqObjects, function (RabbitMqObjectInterface $object) { |
208
|
|
|
return $object instanceof QueueInterface; |
209
|
|
|
}); |
210
|
|
|
} |
211
|
|
|
} |
212
|
|
|
|
Scrutinizer analyzes your
composer.json
/composer.lock
file if available to determine the classes, and functions that are defined by your dependencies.It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.