Completed
Pull Request — master (#281)
by Grégoire
01:45
created

AMQPBackendDispatcher::getChannel()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 21
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 21
rs 9.3142
c 0
b 0
f 0
cc 3
eloc 10
nc 3
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Sonata Project package.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use Enqueue\AmqpTools\DelayStrategyAware;
15
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
16
use Guzzle\Http\Client as GuzzleClient;
17
use Interop\Amqp\AmqpConnectionFactory;
18
use Interop\Amqp\AmqpContext;
19
use PhpAmqpLib\Channel\AMQPChannel;
20
use PhpAmqpLib\Connection\AMQPConnection;
21
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
22
use Sonata\NotificationBundle\Model\MessageInterface;
23
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
24
use ZendDiagnostics\Result\Failure;
25
use ZendDiagnostics\Result\Success;
26
27
/**
28
 * Producer side of the rabbitmq backend.
29
 */
30
class AMQPBackendDispatcher extends QueueBackendDispatcher
31
{
32
    /**
33
     * @var array
34
     */
35
    protected $settings;
36
37
    /**
38
     * @deprecated since 3.2, will be removed in 4.x
39
     *
40
     * @var AMQPChannel
41
     */
42
    protected $channel;
43
44
    /**
45
     * @deprecated since 3.2, will be removed in 4.x
46
     *
47
     * @var AMQPConnection
48
     */
49
    protected $connection;
50
51
    protected $backendsInitialized = false;
52
53
    /**
54
     * @var AmqpConnectionFactory
55
     */
56
    private $connectionFactory;
57
58
    /**
59
     * @var AmqpContext
60
     */
61
    private $context;
62
63
    /**
64
     * @param array  $settings
65
     * @param array  $queues
66
     * @param string $defaultQueue
67
     * @param array  $backends
68
     */
69
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
70
    {
71
        parent::__construct($queues, $defaultQueue, $backends);
72
73
        $this->settings = $settings;
74
    }
75
76
    /**
77
     * @deprecated since 3.2, will be removed in 4.x
78
     *
79
     * @return AMQPChannel
80
     */
81
    public function getChannel()
82
    {
83
        @trigger_error(sprintf('The method %s is deprecated since version 3.3 and will be removed in 4.0. Use %s::getContext() instead.', __METHOD__, __CLASS__), E_USER_DEPRECATED);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
84
85
        if (!$this->channel) {
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
86
            if (!$this->context instanceof \Enqueue\AmqpLib\AmqpContext) {
87
                throw new \LogicException('The BC layer works only if enqueue/amqp-lib lib is being used.');
88
            }
89
90
            // load context
91
            $this->getContext();
92
93
            /** @var \Enqueue\AmqpLib\AmqpContext $context */
94
            $context = $this->getContext();
95
96
            $this->channel = $context->getLibChannel();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
97
            $this->connection = $this->channel->getConnection();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...Dispatcher::$connection has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
98
        }
99
100
        return $this->channel;
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
101
    }
102
103
    /**
104
     * @return AmqpContext
105
     */
106
    final public function getContext()
107
    {
108
        if (!$this->context) {
109
            if (!array_key_exists('factory_class', $this->settings)) {
110
                throw new \LogicException('The factory_class option is missing though it is required.');
111
            }
112
            $factoryClass = $this->settings['factory_class'];
113
            if (
114
                !class_exists($factoryClass) ||
115
                !(new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)
116
            ) {
117
                throw new \LogicException(sprintf(
118
                    'The factory_class option "%s" has to be valid class that implements "%s"',
119
                    $factoryClass,
120
                    AmqpConnectionFactory::class
121
                ));
122
            }
123
124
            /* @var AmqpConnectionFactory $factory */
125
            $this->connectionFactory = $factory = new $factoryClass([
126
                'host' => $this->settings['host'],
127
                'port' => $this->settings['port'],
128
                'user' => $this->settings['user'],
129
                'pass' => $this->settings['pass'],
130
                'vhost' => $this->settings['vhost'],
131
            ]);
132
133
            if ($factory instanceof DelayStrategyAware) {
134
                $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
135
            }
136
137
            $this->context = $factory->createContext();
138
139
            register_shutdown_function([$this, 'shutdown']);
140
        }
141
142
        return $this->context;
143
    }
144
145
    /**
146
     * {@inheritdoc}
147
     */
148
    public function getBackend($type)
149
    {
150
        if (!$this->backendsInitialized) {
151
            foreach ($this->backends as $backend) {
152
                $backend['backend']->initialize();
153
            }
154
            $this->backendsInitialized = true;
155
        }
156
157
        $default = null;
158
159
        if (0 === count($this->queues)) {
160
            foreach ($this->backends as $backend) {
161
                if ('default' === $backend['type']) {
162
                    return $backend['backend'];
163
                }
164
            }
165
        }
166
167
        foreach ($this->backends as $backend) {
168
            if ('all' === $type && '' === $backend['type']) {
169
                return $backend['backend'];
170
            }
171
172
            if ($backend['type'] === $type) {
173
                return $backend['backend'];
174
            }
175
176
            if ($backend['type'] === $this->defaultQueue) {
177
                $default = $backend['backend'];
178
            }
179
        }
180
181
        if (null === $default) {
182
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
183
        }
184
185
        return $default;
186
    }
187
188
    /**
189
     * {@inheritdoc}
190
     */
191
    public function getIterator()
192
    {
193
        throw new \RuntimeException(
194
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
195
        );
196
    }
197
198
    /**
199
     * {@inheritdoc}
200
     */
201
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
202
    {
203
        throw new \RuntimeException(
204
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
205
        );
206
    }
207
208
    /**
209
     * {@inheritdoc}
210
     */
211
    public function getStatus()
212
    {
213
        try {
214
            $this->getContext();
215
            $output = $this->getApiQueueStatus();
216
            $checked = 0;
217
            $missingConsumers = [];
218
219
            foreach ($this->queues as $queue) {
220
                foreach ($output as $q) {
221
                    if ($q['name'] === $queue['queue']) {
222
                        ++$checked;
223
                        if (0 === $q['consumers']) {
224
                            $missingConsumers[] = $queue['queue'];
225
                        }
226
                    }
227
                }
228
            }
229
230
            if ($checked !== count($this->queues)) {
231
                return new Failure(
232
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
233
                    .'Are the consumer commands running?'
234
                );
235
            }
236
237
            if (count($missingConsumers) > 0) {
238
                return new Failure(
239
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
240
                );
241
            }
242
        } catch (\Exception $e) {
243
            return new Failure($e->getMessage());
244
        }
245
246
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
247
    }
248
249
    /**
250
     * {@inheritdoc}
251
     */
252
    public function cleanup()
253
    {
254
        throw new \RuntimeException(
255
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
256
        );
257
    }
258
259
    public function shutdown()
260
    {
261
        if ($this->context) {
262
            $this->context->close();
263
        }
264
    }
265
266
    /**
267
     * {@inheritdoc}
268
     */
269
    public function initialize()
270
    {
271
    }
272
273
    /**
274
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
275
     *
276
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
277
     *
278
     * @return array
279
     */
280
    protected function getApiQueueStatus()
281
    {
282
        if (!class_exists(GuzzleClient::class)) {
283
            throw new \RuntimeException(
284
                'The guzzle http client library is required to run rabbitmq health checks. '
285
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
286
            );
287
        }
288
289
        $client = new GuzzleClient();
290
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
291
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
292
        $request->setAuth($this->settings['user'], $this->settings['pass']);
293
294
        return json_decode($request->send()->getBody(true), true);
295
    }
296
}
297