Completed
Pull Request — 3.x (#336)
by
unknown
04:29
created

AMQPBackendDispatcher::setStatusProvider()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
3
/*
4
 * This file is part of the Sonata Project package.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use Enqueue\AmqpTools\DelayStrategyAware;
15
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
16
use Interop\Amqp\AmqpConnectionFactory;
17
use Interop\Amqp\AmqpContext;
18
use PhpAmqpLib\Channel\AMQPChannel;
19
use PhpAmqpLib\Connection\AMQPConnection;
20
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
21
use Sonata\NotificationBundle\Exception\MonitoringException;
22
use Sonata\NotificationBundle\Model\MessageInterface;
23
use Sonata\NotificationBundle\Service\RabbitMQQueueStatusProviderInterface;
24
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
25
use ZendDiagnostics\Result\Failure;
26
use ZendDiagnostics\Result\Success;
27
28
/**
29
 * Producer side of the rabbitmq backend.
30
 */
31
class AMQPBackendDispatcher extends QueueBackendDispatcher
32
{
33
    /**
34
     * @var array
35
     */
36
    protected $settings;
37
38
    /**
39
     * @deprecated since 3.2, will be removed in 4.x
40
     *
41
     * @var AMQPChannel
42
     */
43
    protected $channel;
44
45
    /**
46
     * @deprecated since 3.2, will be removed in 4.x
47
     *
48
     * @var AMQPConnection
49
     */
50
    protected $connection;
51
52
    protected $backendsInitialized = false;
53
54
    /**
55
     * @var RabbitMQQueueStatusProviderInterface
56
     */
57
    private $statusProvider;
58
59
    /**
60
     * @var AmqpConnectionFactory
61
     */
62
    private $connectionFactory;
63
64
    /**
65
     * @var AmqpContext
66
     */
67
    private $context;
68
69
    /**
70
     * @param array  $settings
71
     * @param array  $queues
72
     * @param string $defaultQueue
73
     * @param array  $backends
74
     */
75
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
76
    {
77
        parent::__construct($queues, $defaultQueue, $backends);
78
79
        $this->settings = $settings;
80
    }
81
82
    /**
83
     * @deprecated since 3.2, will be removed in 4.x
84
     *
85
     * @return AMQPChannel
86
     */
87
    public function getChannel()
88
    {
89
        @trigger_error(sprintf('The method %s is deprecated since version 3.3 and will be removed in 4.0. Use %s::getContext() instead.', __METHOD__, __CLASS__), E_USER_DEPRECATED);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
90
91
        if (!$this->channel) {
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
92
            if (!$this->context instanceof \Enqueue\AmqpLib\AmqpContext) {
93
                throw new \LogicException('The BC layer works only if enqueue/amqp-lib lib is being used.');
94
            }
95
96
            // load context
97
            $this->getContext();
98
99
            /** @var \Enqueue\AmqpLib\AmqpContext $context */
100
            $context = $this->getContext();
101
102
            $this->channel = $context->getLibChannel();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
103
            $this->connection = $this->channel->getConnection();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...Dispatcher::$connection has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
104
        }
105
106
        return $this->channel;
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
107
    }
108
109
    /**
110
     * @return AmqpContext
111
     */
112
    final public function getContext()
113
    {
114
        if (!$this->context) {
115
            if (!array_key_exists('factory_class', $this->settings)) {
116
                throw new \LogicException('The factory_class option is missing though it is required.');
117
            }
118
            $factoryClass = $this->settings['factory_class'];
119
            if (
120
                !class_exists($factoryClass) ||
121
                !(new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)
122
            ) {
123
                throw new \LogicException(sprintf(
124
                    'The factory_class option "%s" has to be valid class that implements "%s"',
125
                    $factoryClass,
126
                    AmqpConnectionFactory::class
127
                ));
128
            }
129
130
            /* @var AmqpConnectionFactory $factory */
131
            $this->connectionFactory = $factory = new $factoryClass([
132
                'host' => $this->settings['host'],
133
                'port' => $this->settings['port'],
134
                'user' => $this->settings['user'],
135
                'pass' => $this->settings['pass'],
136
                'vhost' => $this->settings['vhost'],
137
            ]);
138
139
            if ($factory instanceof DelayStrategyAware) {
140
                $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
141
            }
142
143
            $this->context = $factory->createContext();
144
145
            register_shutdown_function([$this, 'shutdown']);
146
        }
147
148
        return $this->context;
149
    }
150
151
    /**
152
     * {@inheritdoc}
153
     */
154
    public function getBackend($type)
155
    {
156
        if (!$this->backendsInitialized) {
157
            foreach ($this->backends as $backend) {
158
                $backend['backend']->initialize();
159
            }
160
            $this->backendsInitialized = true;
161
        }
162
163
        $default = null;
164
165
        if (0 === count($this->queues)) {
166
            foreach ($this->backends as $backend) {
167
                if ('default' === $backend['type']) {
168
                    return $backend['backend'];
169
                }
170
            }
171
        }
172
173
        foreach ($this->backends as $backend) {
174
            if ('all' === $type && '' === $backend['type']) {
175
                return $backend['backend'];
176
            }
177
178
            if ($backend['type'] === $type) {
179
                return $backend['backend'];
180
            }
181
182
            if ($backend['type'] === $this->defaultQueue) {
183
                $default = $backend['backend'];
184
            }
185
        }
186
187
        if (null === $default) {
188
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
189
        }
190
191
        return $default;
192
    }
193
194
    /**
195
     * {@inheritdoc}
196
     */
197
    public function getIterator()
198
    {
199
        throw new \RuntimeException(
200
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
201
        );
202
    }
203
204
    /**
205
     * {@inheritdoc}
206
     */
207
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
208
    {
209
        throw new \RuntimeException(
210
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
211
        );
212
    }
213
214
    /**
215
     * {@inheritdoc}
216
     */
217
    public function getStatus()
218
    {
219
        if (null === $this->statusProvider) {
220
            return new Failure(
221
                sprintf(
222
                    'Service that implements interface %s is not available. Can not get RabbitMQ status',
223
                    RabbitMQQueueStatusProviderInterface::class
224
                )
225
            );
226
        }
227
228
        try {
229
            $this->getContext();
230
            $output = $this->statusProvider->getApiQueueStatus();
231
232
            $checked = 0;
233
            $missingConsumers = [];
234
235
            foreach ($this->queues as $queue) {
236
                foreach ($output as $q) {
0 ignored issues
show
Bug introduced by
The expression $output of type array|null is not guaranteed to be traversable. How about adding an additional type check?

There are different options of fixing this problem.

  1. If you want to be on the safe side, you can add an additional type-check:

    $collection = json_decode($data, true);
    if ( ! is_array($collection)) {
        throw new \RuntimeException('$collection must be an array.');
    }
    
    foreach ($collection as $item) { /** ... */ }
    
  2. If you are sure that the expression is traversable, you might want to add a doc comment cast to improve IDE auto-completion and static analysis:

    /** @var array $collection */
    $collection = json_decode($data, true);
    
    foreach ($collection as $item) { /** .. */ }
    
  3. Mark the issue as a false-positive: Just hover the remove button, in the top-right corner of this issue for more options.

Loading history...
237
                    if ($q['name'] === $queue['queue']) {
238
                        ++$checked;
239
                        if (0 === $q['consumers']) {
240
                            $missingConsumers[] = $queue['queue'];
241
                        }
242
                    }
243
                }
244
            }
245
246
            if ($checked !== count($this->queues)) {
247
                return new Failure(
248
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
249
                    .'Are the consumer commands running?'
250
                );
251
            }
252
253
            if (count($missingConsumers) > 0) {
254
                return new Failure(
255
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
256
                );
257
            }
258
        } catch (MonitoringException $e) {
259
            return new Failure($e->getMessage());
260
        }
261
262
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
263
    }
264
265
    /**
266
     * {@inheritdoc}
267
     */
268
    public function cleanup()
269
    {
270
        throw new \RuntimeException(
271
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
272
        );
273
    }
274
275
    public function shutdown()
276
    {
277
        if ($this->context) {
278
            $this->context->close();
279
        }
280
    }
281
282
    /**
283
     * {@inheritdoc}
284
     */
285
    public function initialize()
286
    {
287
    }
288
289
    public function setStatusProvider(RabbitMQQueueStatusProviderInterface $statusProvider)
290
    {
291
        $this->statusProvider = $statusProvider;
292
    }
293
}
294