Completed
Pull Request — master (#276)
by Maksim
01:50
created

AMQPBackendDispatcher::getContext()   B

Complexity

Conditions 4
Paths 4

Size

Total Lines 26
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 26
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 15
nc 4
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Sonata Project package.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use Enqueue\AmqpTools\DelayStrategyAware;
15
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
16
use Guzzle\Http\Client as GuzzleClient;
17
use Interop\Amqp\AmqpContext;
18
use Interop\Amqp\AmqpConnectionFactory;
19
use PhpAmqpLib\Channel\AMQPChannel;
20
use PhpAmqpLib\Connection\AMQPConnection;
21
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
22
use Sonata\NotificationBundle\Model\MessageInterface;
23
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
24
use ZendDiagnostics\Result\Failure;
25
use ZendDiagnostics\Result\Success;
26
27
/**
28
 * Producer side of the rabbitmq backend.
29
 */
30
class AMQPBackendDispatcher extends QueueBackendDispatcher
31
{
32
    /**
33
     * @var array
34
     */
35
    protected $settings;
36
37
    /**
38
     * @var AmqpConnectionFactory
39
     */
40
    protected $connectionFactory;
41
42
    /**
43
     * @var AmqpContext
44
     */
45
    protected $context;
46
47
    /**
48
     * @deprecated since 3.2, will be removed in 4.x
49
     *
50
     * @var AMQPChannel
51
     */
52
    protected $channel;
53
54
    /**
55
     * @deprecated since 3.2, will be removed in 4.x
56
     *
57
     * @var AMQPConnection
58
     */
59
    protected $connection;
60
61
    protected $backendsInitialized = false;
62
63
    /**
64
     * @param array  $settings
65
     * @param array  $queues
66
     * @param string $defaultQueue
67
     * @param array  $backends
68
     */
69
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
70
    {
71
        parent::__construct($queues, $defaultQueue, $backends);
72
73
        $this->settings = $settings;
74
    }
75
76
    /**
77
     * @deprecated since 3.2, will be removed in 4.x
78
     *
79
     * @return AMQPChannel
80
     */
81
    public function getChannel()
82
    {
83
        @trigger_error(sprintf('The method %s is deprecated since version 3.3 and will be removed in 4.0. Use %s::getContext() instead.', __METHOD__, __CLASS__), E_USER_DEPRECATED);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
84
85
        if (!$this->channel) {
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
86
            // load context
87
            $this->getContext();
88
89
            /** @var \Enqueue\AmqpLib\AmqpContext $context */
90
            $context = $this->getContext();
91
92
            $this->channel = $context->getLibChannel();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
93
            $this->connection = $this->channel->getConnection();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...Dispatcher::$connection has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
94
        }
95
96
        return $this->channel;
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
97
    }
98
99
    /**
100
     * @return AmqpContext
101
     */
102
    public function getContext()
103
    {
104
        if (!$this->context) {
105
            if (!class_exists(\Enqueue\AmqpLib\AmqpConnectionFactory::class)) {
106
                throw new \RuntimeException('Please install enqueue/amqp-lib:^0.8 as a dependency');
107
            }
108
            /** @var AmqpConnectionFactory $factory */
109
            $factory = new \Enqueue\AmqpLib\AmqpConnectionFactory([
110
                'host' => $this->settings['host'],
111
                'port' => $this->settings['port'],
112
                'user' => $this->settings['user'],
113
                'pass' => $this->settings['pass'],
114
                'vhost' => $this->settings['vhost'],
115
            ]);
116
117
            if ($factory instanceof DelayStrategyAware) {
118
                $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
119
            }
120
121
            $this->context = $this->connectionFactory->createContext();
122
123
            register_shutdown_function([$this, 'shutdown']);
124
        }
125
126
        return $this->context;
127
    }
128
129
    /**
130
     * {@inheritdoc}
131
     */
132
    public function getBackend($type)
133
    {
134
        if (!$this->backendsInitialized) {
135
            foreach ($this->backends as $backend) {
136
                $backend['backend']->initialize();
137
            }
138
            $this->backendsInitialized = true;
139
        }
140
141
        $default = null;
142
143
        if (0 === count($this->queues)) {
144
            foreach ($this->backends as $backend) {
145
                if ('default' === $backend['type']) {
146
                    return $backend['backend'];
147
                }
148
            }
149
        }
150
151
        foreach ($this->backends as $backend) {
152
            if ('all' === $type && '' === $backend['type']) {
153
                return $backend['backend'];
154
            }
155
156
            if ($backend['type'] === $type) {
157
                return $backend['backend'];
158
            }
159
160
            if ($backend['type'] === $this->defaultQueue) {
161
                $default = $backend['backend'];
162
            }
163
        }
164
165
        if (null === $default) {
166
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
167
        }
168
169
        return $default;
170
    }
171
172
    /**
173
     * {@inheritdoc}
174
     */
175
    public function getIterator()
176
    {
177
        throw new \RuntimeException(
178
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
179
        );
180
    }
181
182
    /**
183
     * {@inheritdoc}
184
     */
185
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
186
    {
187
        throw new \RuntimeException(
188
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
189
        );
190
    }
191
192
    /**
193
     * {@inheritdoc}
194
     */
195
    public function getStatus()
196
    {
197
        try {
198
            $this->getContext();
199
            $output = $this->getApiQueueStatus();
200
            $checked = 0;
201
            $missingConsumers = [];
202
203
            foreach ($this->queues as $queue) {
204
                foreach ($output as $q) {
205
                    if ($q['name'] === $queue['queue']) {
206
                        ++$checked;
207
                        if (0 === $q['consumers']) {
208
                            $missingConsumers[] = $queue['queue'];
209
                        }
210
                    }
211
                }
212
            }
213
214
            if ($checked !== count($this->queues)) {
215
                return new Failure(
216
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
217
                    .'Are the consumer commands running?'
218
                );
219
            }
220
221
            if (count($missingConsumers) > 0) {
222
                return new Failure(
223
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
224
                );
225
            }
226
        } catch (\Exception $e) {
227
            return new Failure($e->getMessage());
228
        }
229
230
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
231
    }
232
233
    /**
234
     * {@inheritdoc}
235
     */
236
    public function cleanup()
237
    {
238
        throw new \RuntimeException(
239
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
240
        );
241
    }
242
243
    public function shutdown()
244
    {
245
        if ($this->context) {
246
            $this->context->close();
247
        }
248
    }
249
250
    /**
251
     * {@inheritdoc}
252
     */
253
    public function initialize()
254
    {
255
    }
256
257
    /**
258
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
259
     *
260
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
261
     *
262
     * @return array
263
     */
264
    protected function getApiQueueStatus()
265
    {
266
        if (!class_exists(GuzzleClient::class)) {
267
            throw new \RuntimeException(
268
                'The guzzle http client library is required to run rabbitmq health checks. '
269
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
270
            );
271
        }
272
273
        $client = new GuzzleClient();
274
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
275
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
276
        $request->setAuth($this->settings['user'], $this->settings['pass']);
277
278
        return json_decode($request->send()->getBody(true), true);
279
    }
280
}
281