Completed
Pull Request — 3.x (#387)
by
unknown
03:04
created

AMQPBackendDispatcher   A

Complexity

Total Complexity 40

Size/Duplication

Total Lines 274
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 10

Importance

Changes 0
Metric Value
wmc 40
lcom 1
cbo 10
dl 0
loc 274
rs 9.2
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A getChannel() 0 21 3
B getContext() 0 38 7
C getBackend() 0 39 12
A getIterator() 0 6 1
A handle() 0 6 1
B getStatus() 0 37 8
A cleanup() 0 6 1
A shutdown() 0 6 2
A initialize() 0 3 1
A setDelayStrategy() 0 4 1
A getApiQueueStatus() 0 16 2

How to fix   Complexity   

Complex Class

Complex classes like AMQPBackendDispatcher often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AMQPBackendDispatcher, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Sonata Project package.
7
 *
8
 * (c) Thomas Rabaix <[email protected]>
9
 *
10
 * For the full copyright and license information, please view the LICENSE
11
 * file that was distributed with this source code.
12
 */
13
14
namespace Sonata\NotificationBundle\Backend;
15
16
use Enqueue\AmqpTools\DelayStrategy;
17
use Enqueue\AmqpTools\DelayStrategyAware;
18
use Guzzle\Http\Client as GuzzleClient;
19
use Interop\Amqp\AmqpConnectionFactory;
20
use Interop\Amqp\AmqpContext;
21
use PhpAmqpLib\Channel\AMQPChannel;
22
use PhpAmqpLib\Connection\AMQPConnection;
23
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
24
use Sonata\NotificationBundle\Model\MessageInterface;
25
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
26
use ZendDiagnostics\Result\Failure;
27
use ZendDiagnostics\Result\Success;
28
29
/**
30
 * Producer side of the rabbitmq backend.
31
 */
32
class AMQPBackendDispatcher extends QueueBackendDispatcher
33
{
34
    /**
35
     * @var array
36
     */
37
    protected $settings;
38
39
    /**
40
     * @deprecated since 3.2, will be removed in 4.x
41
     *
42
     * @var AMQPChannel
43
     */
44
    protected $channel;
45
46
    /**
47
     * @deprecated since 3.2, will be removed in 4.x
48
     *
49
     * @var AMQPConnection
50
     */
51
    protected $connection;
52
53
    protected $backendsInitialized = false;
54
55
    /**
56
     * @var AmqpConnectionFactory
57
     */
58
    private $connectionFactory;
59
60
    /**
61
     * @var AmqpContext
62
     */
63
    private $context;
64
65
    /**
66
     * @var DelayStrategy
67
     */
68
    private $delayStrategy = null;
69
70
    /**
71
     * @param string $defaultQueue
72
     */
73
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
74
    {
75
        parent::__construct($queues, $defaultQueue, $backends);
76
77
        $this->settings = $settings;
78
    }
79
80
    /**
81
     * @deprecated since 3.2, will be removed in 4.x
82
     *
83
     * @return AMQPChannel
84
     */
85
    public function getChannel()
86
    {
87
        @trigger_error(sprintf('The method %s is deprecated since version 3.3 and will be removed in 4.0. Use %s::getContext() instead.', __METHOD__, __CLASS__), E_USER_DEPRECATED);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
88
89
        if (!$this->channel) {
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
90
            if (!$this->context instanceof \Enqueue\AmqpLib\AmqpContext) {
91
                throw new \LogicException('The BC layer works only if enqueue/amqp-lib lib is being used.');
92
            }
93
94
            // load context
95
            $this->getContext();
96
97
            /** @var \Enqueue\AmqpLib\AmqpContext $context */
98
            $context = $this->getContext();
99
100
            $this->channel = $context->getLibChannel();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
101
            $this->connection = $this->channel->getConnection();
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...Dispatcher::$connection has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
102
        }
103
104
        return $this->channel;
0 ignored issues
show
Deprecated Code introduced by
The property Sonata\NotificationBundl...endDispatcher::$channel has been deprecated with message: since 3.2, will be removed in 4.x

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
105
    }
106
107
    /**
108
     * @return AmqpContext
109
     */
110
    final public function getContext()
111
    {
112
        if (!$this->context) {
113
            if (!\array_key_exists('factory_class', $this->settings)) {
114
                throw new \LogicException('The factory_class option is missing though it is required.');
115
            }
116
            $factoryClass = $this->settings['factory_class'];
117
            if (
118
                !class_exists($factoryClass) ||
119
                !(new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)
120
            ) {
121
                throw new \LogicException(sprintf(
122
                    'The factory_class option "%s" has to be valid class that implements "%s"',
123
                    $factoryClass,
124
                    AmqpConnectionFactory::class
125
                ));
126
            }
127
128
            /* @var AmqpConnectionFactory $factory */
129
            $this->connectionFactory = $factory = new $factoryClass([
130
                'host' => $this->settings['host'],
131
                'port' => $this->settings['port'],
132
                'user' => $this->settings['user'],
133
                'pass' => $this->settings['pass'],
134
                'vhost' => $this->settings['vhost'],
135
            ]);
136
137
            if ($factory instanceof DelayStrategyAware && $this->delayStrategy instanceof DelayStrategy) {
138
                $factory->setDelayStrategy($this->delayStrategy);
139
            }
140
141
            $this->context = $factory->createContext();
142
143
            register_shutdown_function([$this, 'shutdown']);
144
        }
145
146
        return $this->context;
147
    }
148
149
    /**
150
     * {@inheritdoc}
151
     */
152
    public function getBackend($type)
153
    {
154
        if (!$this->backendsInitialized) {
155
            foreach ($this->backends as $backend) {
156
                $backend['backend']->initialize();
157
            }
158
            $this->backendsInitialized = true;
159
        }
160
161
        $default = null;
162
163
        if (0 === \count($this->queues)) {
164
            foreach ($this->backends as $backend) {
165
                if ('default' === $backend['type']) {
166
                    return $backend['backend'];
167
                }
168
            }
169
        }
170
171
        foreach ($this->backends as $backend) {
172
            if ('all' === $type && '' === $backend['type']) {
173
                return $backend['backend'];
174
            }
175
176
            if ($backend['type'] === $type) {
177
                return $backend['backend'];
178
            }
179
180
            if ($backend['type'] === $this->defaultQueue) {
181
                $default = $backend['backend'];
182
            }
183
        }
184
185
        if (null === $default) {
186
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
187
        }
188
189
        return $default;
190
    }
191
192
    /**
193
     * {@inheritdoc}
194
     */
195
    public function getIterator()
196
    {
197
        throw new \RuntimeException(
198
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
199
        );
200
    }
201
202
    /**
203
     * {@inheritdoc}
204
     */
205
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
206
    {
207
        throw new \RuntimeException(
208
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
209
        );
210
    }
211
212
    /**
213
     * {@inheritdoc}
214
     */
215
    public function getStatus()
216
    {
217
        try {
218
            $this->getContext();
219
            $output = $this->getApiQueueStatus();
220
            $checked = 0;
221
            $missingConsumers = [];
222
223
            foreach ($this->queues as $queue) {
224
                foreach ($output as $q) {
225
                    if ($q['name'] === $queue['queue']) {
226
                        ++$checked;
227
                        if (0 === $q['consumers']) {
228
                            $missingConsumers[] = $queue['queue'];
229
                        }
230
                    }
231
                }
232
            }
233
234
            if ($checked !== \count($this->queues)) {
235
                return new Failure(
236
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
237
                    .'Are the consumer commands running?'
238
                );
239
            }
240
241
            if (\count($missingConsumers) > 0) {
242
                return new Failure(
243
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
244
                );
245
            }
246
        } catch (\Exception $e) {
247
            return new Failure($e->getMessage());
248
        }
249
250
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
251
    }
252
253
    /**
254
     * {@inheritdoc}
255
     */
256
    public function cleanup()
257
    {
258
        throw new \RuntimeException(
259
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
260
        );
261
    }
262
263
    public function shutdown()
264
    {
265
        if ($this->context) {
266
            $this->context->close();
267
        }
268
    }
269
270
    /**
271
     * {@inheritdoc}
272
     */
273
    public function initialize()
274
    {
275
    }
276
277
    public function setDelayStrategy(DelayStrategy $delayStrategy): void
278
    {
279
        $this->delayStrategy = $delayStrategy;
280
    }
281
282
    /**
283
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
284
     *
285
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
286
     *
287
     * @return array
288
     */
289
    protected function getApiQueueStatus()
290
    {
291
        if (!class_exists(GuzzleClient::class)) {
292
            throw new \RuntimeException(
293
                'The guzzle http client library is required to run rabbitmq health checks. '
294
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
295
            );
296
        }
297
298
        $client = new GuzzleClient();
299
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
300
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
301
        $request->setAuth($this->settings['user'], $this->settings['pass']);
302
303
        return json_decode($request->send()->getBody(true), true);
304
    }
305
}
306