Completed
Pull Request — 3.x (#387)
by
unknown
01:53
created

AMQPBackendDispatcher::resolveDelayStrategy()   A

Complexity

Conditions 4
Paths 2

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 19
rs 9.6333
c 0
b 0
f 0
cc 4
nc 2
nop 1
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Sonata Project package.
7
 *
8
 * (c) Thomas Rabaix <[email protected]>
9
 *
10
 * For the full copyright and license information, please view the LICENSE
11
 * file that was distributed with this source code.
12
 */
13
14
namespace Sonata\NotificationBundle\Backend;
15
16
use Enqueue\AmqpTools\DelayStrategy;
17
use Enqueue\AmqpTools\DelayStrategyAware;
18
use Guzzle\Http\Client as GuzzleClient;
19
use Interop\Amqp\AmqpConnectionFactory;
20
use Interop\Amqp\AmqpContext;
21
use PhpAmqpLib\Channel\AMQPChannel;
22
use PhpAmqpLib\Connection\AMQPConnection;
23
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
24
use Sonata\NotificationBundle\Model\MessageInterface;
25
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
26
use ZendDiagnostics\Result\Failure;
27
use ZendDiagnostics\Result\Success;
28
29
/**
30
 * Producer side of the rabbitmq backend.
31
 */
32
class AMQPBackendDispatcher extends QueueBackendDispatcher
33
{
34
    /**
35
     * @var array
36
     */
37
    protected $settings;
38
39
    /**
40
     * @deprecated since 3.2, will be removed in 4.x
41
     *
42
     * @var AMQPChannel
43
     */
44
    protected $channel;
45
46
    /**
47
     * @deprecated since 3.2, will be removed in 4.x
48
     *
49
     * @var AMQPConnection
50
     */
51
    protected $connection;
52
53
    protected $backendsInitialized = false;
54
55
    /**
56
     * @var AmqpConnectionFactory
57
     */
58
    private $connectionFactory;
59
60
    /**
61
     * @var AmqpContext
62
     */
63
    private $context;
64
65
    /**
66
     * @param string $defaultQueue
67
     */
68
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
69
    {
70
        parent::__construct($queues, $defaultQueue, $backends);
71
72
        $this->settings = $settings;
73
    }
74
75
    /**
76
     * @deprecated since 3.2, will be removed in 4.x
77
     *
78
     * @return AMQPChannel
79
     */
80
    public function getChannel()
81
    {
82
        @trigger_error(sprintf('The method %s is deprecated since version 3.3 and will be removed in 4.0. Use %s::getContext() instead.', __METHOD__, __CLASS__), E_USER_DEPRECATED);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
83
84
        if (!$this->channel) {
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
85
            if (!$this->context instanceof \Enqueue\AmqpLib\AmqpContext) {
86
                throw new \LogicException('The BC layer works only if enqueue/amqp-lib lib is being used.');
87
            }
88
89
            // load context
90
            $this->getContext();
91
92
            /** @var \Enqueue\AmqpLib\AmqpContext $context */
93
            $context = $this->getContext();
94
95
            $this->channel = $context->getLibChannel();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
96
            $this->connection = $this->channel->getConnection();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...Dispatcher::$connection has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
97
        }
98
99
        return $this->channel;
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
100
    }
101
102
    /**
103
     * @return AmqpContext
104
     */
105
    final public function getContext()
106
    {
107
        if (!$this->context) {
108
            if (!\array_key_exists('factory_class', $this->settings)) {
109
                throw new \LogicException('The factory_class option is missing though it is required.');
110
            }
111
            $factoryClass = $this->settings['factory_class'];
112
            if (
113
                !class_exists($factoryClass) ||
114
                !(new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)
115
            ) {
116
                throw new \LogicException(sprintf(
117
                    'The factory_class option "%s" has to be valid class that implements "%s"',
118
                    $factoryClass,
119
                    AmqpConnectionFactory::class
120
                ));
121
            }
122
123
            /* @var AmqpConnectionFactory $factory */
124
            $this->connectionFactory = $factory = new $factoryClass([
125
                'host' => $this->settings['host'],
126
                'port' => $this->settings['port'],
127
                'user' => $this->settings['user'],
128
                'pass' => $this->settings['pass'],
129
                'vhost' => $this->settings['vhost'],
130
            ]);
131
132
            if ($factory instanceof DelayStrategyAware) {
133
                $factory = $this->resolveDelayStrategy($factory);
134
            }
135
136
            $this->context = $factory->createContext();
137
138
            register_shutdown_function([$this, 'shutdown']);
139
        }
140
141
        return $this->context;
142
    }
143
144
    /**
145
     * {@inheritdoc}
146
     */
147
    public function getBackend($type)
148
    {
149
        if (!$this->backendsInitialized) {
150
            foreach ($this->backends as $backend) {
151
                $backend['backend']->initialize();
152
            }
153
            $this->backendsInitialized = true;
154
        }
155
156
        $default = null;
157
158
        if (0 === \count($this->queues)) {
159
            foreach ($this->backends as $backend) {
160
                if ('default' === $backend['type']) {
161
                    return $backend['backend'];
162
                }
163
            }
164
        }
165
166
        foreach ($this->backends as $backend) {
167
            if ('all' === $type && '' === $backend['type']) {
168
                return $backend['backend'];
169
            }
170
171
            if ($backend['type'] === $type) {
172
                return $backend['backend'];
173
            }
174
175
            if ($backend['type'] === $this->defaultQueue) {
176
                $default = $backend['backend'];
177
            }
178
        }
179
180
        if (null === $default) {
181
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
182
        }
183
184
        return $default;
185
    }
186
187
    /**
188
     * {@inheritdoc}
189
     */
190
    public function getIterator()
191
    {
192
        throw new \RuntimeException(
193
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
194
        );
195
    }
196
197
    /**
198
     * {@inheritdoc}
199
     */
200
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
201
    {
202
        throw new \RuntimeException(
203
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
204
        );
205
    }
206
207
    /**
208
     * {@inheritdoc}
209
     */
210
    public function getStatus()
211
    {
212
        try {
213
            $this->getContext();
214
            $output = $this->getApiQueueStatus();
215
            $checked = 0;
216
            $missingConsumers = [];
217
218
            foreach ($this->queues as $queue) {
219
                foreach ($output as $q) {
220
                    if ($q['name'] === $queue['queue']) {
221
                        ++$checked;
222
                        if (0 === $q['consumers']) {
223
                            $missingConsumers[] = $queue['queue'];
224
                        }
225
                    }
226
                }
227
            }
228
229
            if ($checked !== \count($this->queues)) {
230
                return new Failure(
231
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
232
                    .'Are the consumer commands running?'
233
                );
234
            }
235
236
            if (\count($missingConsumers) > 0) {
237
                return new Failure(
238
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
239
                );
240
            }
241
        } catch (\Exception $e) {
242
            return new Failure($e->getMessage());
243
        }
244
245
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
246
    }
247
248
    /**
249
     * {@inheritdoc}
250
     */
251
    public function cleanup()
252
    {
253
        throw new \RuntimeException(
254
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
255
        );
256
    }
257
258
    public function shutdown()
259
    {
260
        if ($this->context) {
261
            $this->context->close();
262
        }
263
    }
264
265
    /**
266
     * {@inheritdoc}
267
     */
268
    public function initialize()
269
    {
270
    }
271
272
    /**
273
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
274
     *
275
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
276
     *
277
     * @return array
278
     */
279
    protected function getApiQueueStatus()
280
    {
281
        if (!class_exists(GuzzleClient::class)) {
282
            throw new \RuntimeException(
283
                'The guzzle http client library is required to run rabbitmq health checks. '
284
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
285
            );
286
        }
287
288
        $client = new GuzzleClient();
289
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
290
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
291
        $request->setAuth($this->settings['user'], $this->settings['pass']);
292
293
        return json_decode($request->send()->getBody(true), true);
294
    }
295
296
    protected function resolveDelayStrategy(DelayStrategyAware $factory): DelayStrategyAware
297
    {
298
        $delayStrategy = $this->settings['delay_strategy_class'];
299
        if (
300
            !empty($delayStrategy) &&
301
            (
302
                !class_exists($delayStrategy) ||
303
                !(new \ReflectionClass($delayStrategy))->implementsInterface(DelayStrategy::class)
304
            )
305
        ) {
306
            throw new \LogicException(sprintf(
307
                'The delay_strategy_class option "%s" has to be a valid class that implements "%s" or null to ignore',
308
                $delayStrategy,
309
                DelayStrategy::class
310
            ));
311
        }
312
313
        return $factory->setDelayStrategy(new $delayStrategy());
314
    }
315
}
316