Completed
Pull Request — master (#276)
by Maksim
08:40
created

AMQPBackendDispatcher::getStatus()   C

Complexity

Conditions 8
Paths 12

Size

Total Lines 37
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 37
rs 5.3846
c 0
b 0
f 0
cc 8
eloc 22
nc 12
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Sonata Project package.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use Enqueue\AmqpTools\DelayStrategyAware;
15
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
16
use Guzzle\Http\Client as GuzzleClient;
17
use Interop\Amqp\AmqpContext;
18
use Interop\Amqp\AmqpConnectionFactory;
19
use PhpAmqpLib\Channel\AMQPChannel;
20
use PhpAmqpLib\Connection\AMQPConnection;
21
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
22
use Sonata\NotificationBundle\Model\MessageInterface;
23
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
24
use ZendDiagnostics\Result\Failure;
25
use ZendDiagnostics\Result\Success;
26
27
/**
28
 * Producer side of the rabbitmq backend.
29
 */
30
class AMQPBackendDispatcher extends QueueBackendDispatcher
31
{
32
    /**
33
     * @var array
34
     */
35
    protected $settings;
36
37
    /**
38
     * @var AmqpConnectionFactory
39
     */
40
    protected $connectionFactory;
41
42
    /**
43
     * @deprecated since 3.2, will be removed in 4.x
44
     *
45
     * @var AMQPChannel
46
     */
47
    protected $channel;
48
49
    /**
50
     * @deprecated since 3.2, will be removed in 4.x
51
     *
52
     * @var AMQPConnection
53
     */
54
    protected $connection;
55
56
    /**
57
     * @var AmqpContext
58
     */
59
    protected $context;
60
61
    protected $backendsInitialized = false;
62
63
    /**
64
     * @param array  $settings
65
     * @param array  $queues
66
     * @param string $defaultQueue
67
     * @param array  $backends
68
     */
69
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
70
    {
71
        parent::__construct($queues, $defaultQueue, $backends);
72
73
        $this->settings = $settings;
74
    }
75
76
    /**
77
     * @return AmqpContext
78
     */
79
    public function getContext()
80
    {
81
        if (!$this->context) {
82
            if (!array_key_exists('factory_class', $this->settings)) {
83
                throw new \LogicException('The factory_class option is missing though it is required.');
84
            }
85
86
            $factoryClass = $this->settings['factory_class'];
87
            if (
88
                !class_exists($factoryClass) ||
89
                !(new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)
90
            ) {
91
                throw new \LogicException(sprintf(
92
                    'The factory_class option has to be valid class that implements "%s"',
93
                    AmqpConnectionFactory::class
94
                ));
95
            }
96
97
            /** @var AmqpConnectionFactory $factory */
98
            $factory = new $factoryClass([
99
                'host' => $this->settings['host'],
100
                'port' => $this->settings['port'],
101
                'user' => $this->settings['user'],
102
                'pass' => $this->settings['pass'],
103
                'vhost' => $this->settings['vhost'],
104
            ]);
105
106
            if ($factory instanceof DelayStrategyAware) {
107
                $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
108
            }
109
110
            $this->context = $this->connectionFactory->createContext();
111
112
            register_shutdown_function([$this, 'shutdown']);
113
        }
114
115
        return $this->context;
116
    }
117
118
    /**
119
     * {@inheritdoc}
120
     */
121
    public function getBackend($type)
122
    {
123
        if (!$this->backendsInitialized) {
124
            foreach ($this->backends as $backend) {
125
                $backend['backend']->initialize();
126
            }
127
            $this->backendsInitialized = true;
128
        }
129
130
        $default = null;
131
132
        if (0 === count($this->queues)) {
133
            foreach ($this->backends as $backend) {
134
                if ('default' === $backend['type']) {
135
                    return $backend['backend'];
136
                }
137
            }
138
        }
139
140
        foreach ($this->backends as $backend) {
141
            if ('all' === $type && '' === $backend['type']) {
142
                return $backend['backend'];
143
            }
144
145
            if ($backend['type'] === $type) {
146
                return $backend['backend'];
147
            }
148
149
            if ($backend['type'] === $this->defaultQueue) {
150
                $default = $backend['backend'];
151
            }
152
        }
153
154
        if (null === $default) {
155
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
156
        }
157
158
        return $default;
159
    }
160
161
    /**
162
     * {@inheritdoc}
163
     */
164
    public function getIterator()
165
    {
166
        throw new \RuntimeException(
167
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
168
        );
169
    }
170
171
    /**
172
     * {@inheritdoc}
173
     */
174
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
175
    {
176
        throw new \RuntimeException(
177
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
178
        );
179
    }
180
181
    /**
182
     * {@inheritdoc}
183
     */
184
    public function getStatus()
185
    {
186
        try {
187
            $this->getContext();
188
            $output = $this->getApiQueueStatus();
189
            $checked = 0;
190
            $missingConsumers = [];
191
192
            foreach ($this->queues as $queue) {
193
                foreach ($output as $q) {
194
                    if ($q['name'] === $queue['queue']) {
195
                        ++$checked;
196
                        if (0 === $q['consumers']) {
197
                            $missingConsumers[] = $queue['queue'];
198
                        }
199
                    }
200
                }
201
            }
202
203
            if ($checked !== count($this->queues)) {
204
                return new Failure(
205
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
206
                    .'Are the consumer commands running?'
207
                );
208
            }
209
210
            if (count($missingConsumers) > 0) {
211
                return new Failure(
212
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
213
                );
214
            }
215
        } catch (\Exception $e) {
216
            return new Failure($e->getMessage());
217
        }
218
219
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
220
    }
221
222
    /**
223
     * {@inheritdoc}
224
     */
225
    public function cleanup()
226
    {
227
        throw new \RuntimeException(
228
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
229
        );
230
    }
231
232
    public function shutdown()
233
    {
234
        if ($this->context) {
235
            $this->context->close();
236
        }
237
    }
238
239
    /**
240
     * {@inheritdoc}
241
     */
242
    public function initialize()
243
    {
244
    }
245
246
    /**
247
     * @deprecated since 3.2, will be removed in 4.x
248
     *
249
     * @return AMQPChannel
250
     */
251
    public function getChannel()
252
    {
253
        @trigger_error(sprintf('The method %s is deprecated since version 3.3 and will be removed in 4.0. Use %s::getContext() instead.', __METHOD__, __CLASS__), E_USER_DEPRECATED);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
254
255
        if (!$this->channel) {
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
256
            // load context
257
            $this->getContext();
258
259
            /** @var \Enqueue\AmqpLib\AmqpContext $context */
260
            $context = $this->getContext();
261
            if (!$context instanceof \Enqueue\AmqpLib\AmqpContext) {
262
                throw new \LogicException(sprintf('The context is not instance of "%s"', \Enqueue\AmqpLib\AmqpContext::class));
263
            }
264
265
            $this->channel = $context->getLibChannel();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
266
            $this->connection = $this->channel->getConnection();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...Dispatcher::$connection has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
267
        }
268
269
        return $this->channel;
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
270
    }
271
272
    /**
273
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
274
     *
275
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
276
     *
277
     * @return array
278
     */
279
    protected function getApiQueueStatus()
280
    {
281
        if (!class_exists(GuzzleClient::class)) {
282
            throw new \RuntimeException(
283
                'The guzzle http client library is required to run rabbitmq health checks. '
284
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
285
            );
286
        }
287
288
        $client = new GuzzleClient();
289
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
290
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
291
        $request->setAuth($this->settings['user'], $this->settings['pass']);
292
293
        return json_decode($request->send()->getBody(true), true);
294
    }
295
}
296