Completed
Pull Request — 3.x (#387)
by
unknown
01:36
created

AMQPBackendDispatcher::setDelayStrategy()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Sonata Project package.
7
 *
8
 * (c) Thomas Rabaix <[email protected]>
9
 *
10
 * For the full copyright and license information, please view the LICENSE
11
 * file that was distributed with this source code.
12
 */
13
14
namespace Sonata\NotificationBundle\Backend;
15
16
use Enqueue\AmqpTools\DelayStrategy;
17
use Enqueue\AmqpTools\DelayStrategyAware;
18
use Guzzle\Http\Client as GuzzleClient;
19
use Interop\Amqp\AmqpConnectionFactory;
20
use Interop\Amqp\AmqpContext;
21
use PhpAmqpLib\Channel\AMQPChannel;
22
use PhpAmqpLib\Connection\AMQPConnection;
23
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
24
use Sonata\NotificationBundle\Model\MessageInterface;
25
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
26
use ZendDiagnostics\Result\Failure;
27
use ZendDiagnostics\Result\Success;
28
29
/**
30
 * Producer side of the rabbitmq backend.
31
 */
32
class AMQPBackendDispatcher extends QueueBackendDispatcher
33
{
34
    /**
35
     * @var array
36
     */
37
    protected $settings;
38
39
    /**
40
     * @deprecated since 3.2, will be removed in 4.x
41
     *
42
     * @var AMQPChannel
43
     */
44
    protected $channel;
45
46
    /**
47
     * @deprecated since 3.2, will be removed in 4.x
48
     *
49
     * @var AMQPConnection
50
     */
51
    protected $connection;
52
53
    protected $backendsInitialized = false;
54
55
    /**
56
     * @var AmqpConnectionFactory
57
     */
58
    private $connectionFactory;
59
60
    /**
61
     * @var AmqpContext
62
     */
63
    private $context;
64
65
    private $delayStrategy = null;
66
67
    /**
68
     * @param string $defaultQueue
69
     */
70
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
71
    {
72
        parent::__construct($queues, $defaultQueue, $backends);
73
74
        $this->settings = $settings;
75
    }
76
77
    /**
78
     * @deprecated since 3.2, will be removed in 4.x
79
     *
80
     * @return AMQPChannel
81
     */
82
    public function getChannel()
83
    {
84
        @trigger_error(sprintf('The method %s is deprecated since version 3.3 and will be removed in 4.0. Use %s::getContext() instead.', __METHOD__, __CLASS__), E_USER_DEPRECATED);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
85
86
        if (!$this->channel) {
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
87
            if (!$this->context instanceof \Enqueue\AmqpLib\AmqpContext) {
88
                throw new \LogicException('The BC layer works only if enqueue/amqp-lib lib is being used.');
89
            }
90
91
            // load context
92
            $this->getContext();
93
94
            /** @var \Enqueue\AmqpLib\AmqpContext $context */
95
            $context = $this->getContext();
96
97
            $this->channel = $context->getLibChannel();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
98
            $this->connection = $this->channel->getConnection();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...Dispatcher::$connection has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
99
        }
100
101
        return $this->channel;
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
102
    }
103
104
    /**
105
     * @return AmqpContext
106
     */
107
    final public function getContext()
108
    {
109
        if (!$this->context) {
110
            if (!\array_key_exists('factory_class', $this->settings)) {
111
                throw new \LogicException('The factory_class option is missing though it is required.');
112
            }
113
            $factoryClass = $this->settings['factory_class'];
114
            if (
115
                !class_exists($factoryClass) ||
116
                !(new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)
117
            ) {
118
                throw new \LogicException(sprintf(
119
                    'The factory_class option "%s" has to be valid class that implements "%s"',
120
                    $factoryClass,
121
                    AmqpConnectionFactory::class
122
                ));
123
            }
124
125
            /* @var AmqpConnectionFactory $factory */
126
            $this->connectionFactory = $factory = new $factoryClass([
127
                'host' => $this->settings['host'],
128
                'port' => $this->settings['port'],
129
                'user' => $this->settings['user'],
130
                'pass' => $this->settings['pass'],
131
                'vhost' => $this->settings['vhost'],
132
            ]);
133
134
            if ($factory instanceof DelayStrategyAware && $this->delayStrategy instanceof DelayStrategy) {
135
                $factory->setDelayStrategy($this->delayStrategy);
136
            }
137
138
            $this->context = $factory->createContext();
139
140
            register_shutdown_function([$this, 'shutdown']);
141
        }
142
143
        return $this->context;
144
    }
145
146
    /**
147
     * {@inheritdoc}
148
     */
149
    public function getBackend($type)
150
    {
151
        if (!$this->backendsInitialized) {
152
            foreach ($this->backends as $backend) {
153
                $backend['backend']->initialize();
154
            }
155
            $this->backendsInitialized = true;
156
        }
157
158
        $default = null;
159
160
        if (0 === \count($this->queues)) {
161
            foreach ($this->backends as $backend) {
162
                if ('default' === $backend['type']) {
163
                    return $backend['backend'];
164
                }
165
            }
166
        }
167
168
        foreach ($this->backends as $backend) {
169
            if ('all' === $type && '' === $backend['type']) {
170
                return $backend['backend'];
171
            }
172
173
            if ($backend['type'] === $type) {
174
                return $backend['backend'];
175
            }
176
177
            if ($backend['type'] === $this->defaultQueue) {
178
                $default = $backend['backend'];
179
            }
180
        }
181
182
        if (null === $default) {
183
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
184
        }
185
186
        return $default;
187
    }
188
189
    /**
190
     * {@inheritdoc}
191
     */
192
    public function getIterator()
193
    {
194
        throw new \RuntimeException(
195
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
196
        );
197
    }
198
199
    /**
200
     * {@inheritdoc}
201
     */
202
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
203
    {
204
        throw new \RuntimeException(
205
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
206
        );
207
    }
208
209
    /**
210
     * {@inheritdoc}
211
     */
212
    public function getStatus()
213
    {
214
        try {
215
            $this->getContext();
216
            $output = $this->getApiQueueStatus();
217
            $checked = 0;
218
            $missingConsumers = [];
219
220
            foreach ($this->queues as $queue) {
221
                foreach ($output as $q) {
222
                    if ($q['name'] === $queue['queue']) {
223
                        ++$checked;
224
                        if (0 === $q['consumers']) {
225
                            $missingConsumers[] = $queue['queue'];
226
                        }
227
                    }
228
                }
229
            }
230
231
            if ($checked !== \count($this->queues)) {
232
                return new Failure(
233
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
234
                    .'Are the consumer commands running?'
235
                );
236
            }
237
238
            if (\count($missingConsumers) > 0) {
239
                return new Failure(
240
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
241
                );
242
            }
243
        } catch (\Exception $e) {
244
            return new Failure($e->getMessage());
245
        }
246
247
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
248
    }
249
250
    /**
251
     * {@inheritdoc}
252
     */
253
    public function cleanup()
254
    {
255
        throw new \RuntimeException(
256
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
257
        );
258
    }
259
260
    public function shutdown()
261
    {
262
        if ($this->context) {
263
            $this->context->close();
264
        }
265
    }
266
267
    /**
268
     * {@inheritdoc}
269
     */
270
    public function initialize()
271
    {
272
    }
273
274
    public function setDelayStrategy(DelayStrategy $delayStrategy): voidoj
275
    {
276
        $this->delayStrategy = $delayStrategy;
277
    }
278
279
    /**
280
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
281
     *
282
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
283
     *
284
     * @return array
285
     */
286
    protected function getApiQueueStatus()
287
    {
288
        if (!class_exists(GuzzleClient::class)) {
289
            throw new \RuntimeException(
290
                'The guzzle http client library is required to run rabbitmq health checks. '
291
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
292
            );
293
        }
294
295
        $client = new GuzzleClient();
296
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
297
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
298
        $request->setAuth($this->settings['user'], $this->settings['pass']);
299
300
        return json_decode($request->send()->getBody(true), true);
301
    }
302
}
303