1 | <?php |
||
30 | class AMQPBackendDispatcher extends QueueBackendDispatcher |
||
31 | { |
||
32 | /** |
||
33 | * @var array |
||
34 | */ |
||
35 | protected $settings; |
||
36 | |||
37 | /** |
||
38 | * @var AmqpConnectionFactory |
||
39 | */ |
||
40 | protected $connectionFactory; |
||
41 | |||
42 | /** |
||
43 | * @var AmqpContext |
||
44 | */ |
||
45 | protected $context; |
||
46 | |||
47 | /** |
||
48 | * @deprecated since 3.2, will be removed in 4.x |
||
49 | * |
||
50 | * @var AMQPChannel |
||
51 | */ |
||
52 | protected $channel; |
||
53 | |||
54 | /** |
||
55 | * @deprecated since 3.2, will be removed in 4.x |
||
56 | * |
||
57 | * @var AMQPConnection |
||
58 | */ |
||
59 | protected $connection; |
||
60 | |||
61 | protected $backendsInitialized = false; |
||
62 | |||
63 | /** |
||
64 | * @param array $settings |
||
65 | * @param array $queues |
||
66 | * @param string $defaultQueue |
||
67 | * @param array $backends |
||
68 | */ |
||
69 | public function __construct(array $settings, array $queues, $defaultQueue, array $backends) |
||
75 | |||
76 | /** |
||
77 | * @deprecated since 3.2, will be removed in 4.x |
||
78 | * |
||
79 | * @return AMQPChannel |
||
80 | */ |
||
81 | public function getChannel() |
||
82 | { |
||
83 | @trigger_error(sprintf('The method %s is deprecated since version 3.3 and will be removed in 4.0. Use %s::getContext() instead.', __METHOD__, __CLASS__), E_USER_DEPRECATED); |
||
|
|||
84 | |||
85 | if (!$this->channel) { |
||
86 | // load context |
||
87 | $this->getContext(); |
||
88 | |||
89 | /** @var \Enqueue\AmqpLib\AmqpContext $context */ |
||
90 | $context = $this->getContext(); |
||
91 | |||
92 | $this->channel = $context->getLibChannel(); |
||
93 | $this->connection = $this->channel->getConnection(); |
||
94 | } |
||
95 | |||
96 | return $this->channel; |
||
97 | } |
||
98 | |||
99 | /** |
||
100 | * @return AmqpContext |
||
101 | */ |
||
102 | public function getContext() |
||
103 | { |
||
104 | if (!$this->context) { |
||
105 | if (!class_exists(\Enqueue\AmqpLib\AmqpConnectionFactory::class)) { |
||
106 | throw new \RuntimeException('Please install enqueue/amqp-lib:^0.8 as a dependency'); |
||
107 | } |
||
108 | /** @var AmqpConnectionFactory $factory */ |
||
109 | $factory = new \Enqueue\AmqpLib\AmqpConnectionFactory([ |
||
110 | 'host' => $this->settings['host'], |
||
111 | 'port' => $this->settings['port'], |
||
112 | 'user' => $this->settings['user'], |
||
113 | 'pass' => $this->settings['pass'], |
||
114 | 'vhost' => $this->settings['vhost'], |
||
115 | ]); |
||
116 | |||
117 | if ($factory instanceof DelayStrategyAware) { |
||
118 | $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy()); |
||
119 | } |
||
120 | |||
121 | $this->context = $this->connectionFactory->createContext(); |
||
122 | |||
123 | register_shutdown_function([$this, 'shutdown']); |
||
124 | } |
||
125 | |||
126 | return $this->context; |
||
127 | } |
||
128 | |||
129 | /** |
||
130 | * {@inheritdoc} |
||
131 | */ |
||
132 | public function getBackend($type) |
||
133 | { |
||
134 | if (!$this->backendsInitialized) { |
||
135 | foreach ($this->backends as $backend) { |
||
136 | $backend['backend']->initialize(); |
||
137 | } |
||
138 | $this->backendsInitialized = true; |
||
139 | } |
||
140 | |||
141 | $default = null; |
||
142 | |||
143 | if (0 === count($this->queues)) { |
||
144 | foreach ($this->backends as $backend) { |
||
145 | if ('default' === $backend['type']) { |
||
146 | return $backend['backend']; |
||
147 | } |
||
148 | } |
||
149 | } |
||
150 | |||
151 | foreach ($this->backends as $backend) { |
||
152 | if ('all' === $type && '' === $backend['type']) { |
||
153 | return $backend['backend']; |
||
154 | } |
||
155 | |||
156 | if ($backend['type'] === $type) { |
||
157 | return $backend['backend']; |
||
158 | } |
||
159 | |||
160 | if ($backend['type'] === $this->defaultQueue) { |
||
161 | $default = $backend['backend']; |
||
162 | } |
||
163 | } |
||
164 | |||
165 | if (null === $default) { |
||
166 | throw new BackendNotFoundException('Could not find a message backend for the type '.$type); |
||
167 | } |
||
168 | |||
169 | return $default; |
||
170 | } |
||
171 | |||
172 | /** |
||
173 | * {@inheritdoc} |
||
174 | */ |
||
175 | public function getIterator() |
||
176 | { |
||
177 | throw new \RuntimeException( |
||
178 | 'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.' |
||
179 | ); |
||
180 | } |
||
181 | |||
182 | /** |
||
183 | * {@inheritdoc} |
||
184 | */ |
||
185 | public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher) |
||
186 | { |
||
187 | throw new \RuntimeException( |
||
188 | 'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.' |
||
189 | ); |
||
190 | } |
||
191 | |||
192 | /** |
||
193 | * {@inheritdoc} |
||
194 | */ |
||
195 | public function getStatus() |
||
196 | { |
||
197 | try { |
||
198 | $this->getContext(); |
||
199 | $output = $this->getApiQueueStatus(); |
||
200 | $checked = 0; |
||
201 | $missingConsumers = []; |
||
202 | |||
203 | foreach ($this->queues as $queue) { |
||
204 | foreach ($output as $q) { |
||
205 | if ($q['name'] === $queue['queue']) { |
||
206 | ++$checked; |
||
207 | if (0 === $q['consumers']) { |
||
208 | $missingConsumers[] = $queue['queue']; |
||
209 | } |
||
210 | } |
||
211 | } |
||
212 | } |
||
213 | |||
214 | if ($checked !== count($this->queues)) { |
||
215 | return new Failure( |
||
216 | 'Not all queues for the available notification types registered in the rabbitmq broker. ' |
||
217 | .'Are the consumer commands running?' |
||
218 | ); |
||
219 | } |
||
220 | |||
221 | if (count($missingConsumers) > 0) { |
||
222 | return new Failure( |
||
223 | 'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers) |
||
224 | ); |
||
225 | } |
||
226 | } catch (\Exception $e) { |
||
227 | return new Failure($e->getMessage()); |
||
228 | } |
||
229 | |||
230 | return new Success('Channel is running (RabbitMQ) and consumers for all queues available.'); |
||
231 | } |
||
232 | |||
233 | /** |
||
234 | * {@inheritdoc} |
||
235 | */ |
||
236 | public function cleanup() |
||
237 | { |
||
238 | throw new \RuntimeException( |
||
239 | 'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.' |
||
240 | ); |
||
241 | } |
||
242 | |||
243 | public function shutdown() |
||
244 | { |
||
245 | if ($this->context) { |
||
246 | $this->context->close(); |
||
247 | } |
||
248 | } |
||
249 | |||
250 | /** |
||
251 | * {@inheritdoc} |
||
252 | */ |
||
253 | public function initialize() |
||
254 | { |
||
255 | } |
||
256 | |||
257 | /** |
||
258 | * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues. |
||
259 | * |
||
260 | * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html |
||
261 | * |
||
262 | * @return array |
||
263 | */ |
||
264 | protected function getApiQueueStatus() |
||
280 | } |
||
281 |
If you suppress an error, we recommend checking for the error condition explicitly: