Completed
Pull Request — master (#276)
by Maksim
06:30
created

AMQPBackendDispatcher::getChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Sonata Project package.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use Enqueue\AmqpTools\DelayStrategyAware;
15
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
16
use Interop\Amqp\AmqpContext;
17
use Interop\Amqp\AmqpConnectionFactory;
18
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
19
use Sonata\NotificationBundle\Model\MessageInterface;
20
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
21
use ZendDiagnostics\Result\Failure;
22
use ZendDiagnostics\Result\Success;
23
24
/**
25
 * Producer side of the rabbitmq backend.
26
 */
27
class AMQPBackendDispatcher extends QueueBackendDispatcher
28
{
29
    /**
30
     * @var array
31
     */
32
    protected $settings;
33
34
    /**
35
     * @var AmqpConnectionFactory
36
     */
37
    protected $connectionFactory;
38
39
    /**
40
     * @var AmqpContext
41
     */
42
    protected $context;
43
44
    protected $backendsInitialized = false;
45
46
    /**
47
     * @param array  $settings
48
     * @param array  $queues
49
     * @param string $defaultQueue
50
     * @param array  $backends
51
     */
52
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
53
    {
54
        parent::__construct($queues, $defaultQueue, $backends);
55
56
        $this->settings = $settings;
57
    }
58
59
    /**
60
     * @return AmqpContext
61
     */
62
    public function getContext()
63
    {
64
        if (!$this->context) {
65
            if (false == array_key_exists('factory_class', $this->settings)) {
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
66
                throw new \LogicException('The factory_class option is missing though it is required.');
67
            }
68
69
            $factoryClass = $this->settings['factory_class'];
70
            if (false == class_exists($factoryClass) || false == (new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)) {
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
71
                throw new \LogicException(sprintf('The factory_class option has to be valid class that implements "%s"', AmqpConnectionFactory::class));
72
            }
73
74
            /** @var AmqpConnectionFactory $factory */
75
            $factory = new $factoryClass([
76
                'host' => $this->settings['host'],
77
                'port' => $this->settings['port'],
78
                'user' => $this->settings['user'],
79
                'pass' => $this->settings['pass'],
80
                'vhost' => $this->settings['vhost'],
81
            ]);
82
83
            if ($factory instanceof DelayStrategyAware) {
84
                $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
85
            }
86
87
            $this->context = $this->connectionFactory->createContext();
88
89
            register_shutdown_function([$this, 'shutdown']);
90
        }
91
92
        return $this->context;
93
    }
94
95
96
    /**
97
     * @deprecated
98
     */
99
    public function getChannel()
100
    {
101
        throw new \LogicException('Not used any more');
102
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107
    public function getBackend($type)
108
    {
109
        if (!$this->backendsInitialized) {
110
            foreach ($this->backends as $backend) {
111
                $backend['backend']->initialize();
112
            }
113
            $this->backendsInitialized = true;
114
        }
115
116
        $default = null;
117
118
        if (0 === count($this->queues)) {
119
            foreach ($this->backends as $backend) {
120
                if ('default' === $backend['type']) {
121
                    return $backend['backend'];
122
                }
123
            }
124
        }
125
126
        foreach ($this->backends as $backend) {
127
            if ('all' === $type && '' === $backend['type']) {
128
                return $backend['backend'];
129
            }
130
131
            if ($backend['type'] === $type) {
132
                return $backend['backend'];
133
            }
134
135
            if ($backend['type'] === $this->defaultQueue) {
136
                $default = $backend['backend'];
137
            }
138
        }
139
140
        if (null === $default) {
141
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
142
        }
143
144
        return $default;
145
    }
146
147
    /**
148
     * {@inheritdoc}
149
     */
150
    public function getIterator()
151
    {
152
        throw new \RuntimeException(
153
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
154
        );
155
    }
156
157
    /**
158
     * {@inheritdoc}
159
     */
160
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
161
    {
162
        throw new \RuntimeException(
163
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
164
        );
165
    }
166
167
    /**
168
     * {@inheritdoc}
169
     */
170
    public function getStatus()
171
    {
172
        try {
173
            $this->getContext();
174
            $output = $this->getApiQueueStatus();
175
            $checked = 0;
176
            $missingConsumers = [];
177
178
            foreach ($this->queues as $queue) {
179
                foreach ($output as $q) {
180
                    if ($q['name'] === $queue['queue']) {
181
                        ++$checked;
182
                        if (0 === $q['consumers']) {
183
                            $missingConsumers[] = $queue['queue'];
184
                        }
185
                    }
186
                }
187
            }
188
189
            if ($checked !== count($this->queues)) {
190
                return new Failure(
191
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
192
                    .'Are the consumer commands running?'
193
                );
194
            }
195
196
            if (count($missingConsumers) > 0) {
197
                return new Failure(
198
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
199
                );
200
            }
201
        } catch (\Exception $e) {
202
            return new Failure($e->getMessage());
203
        }
204
205
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
206
    }
207
208
    /**
209
     * {@inheritdoc}
210
     */
211
    public function cleanup()
212
    {
213
        throw new \RuntimeException(
214
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
215
        );
216
    }
217
218
    public function shutdown()
219
    {
220
        if ($this->context) {
221
            $this->context->close();
222
        }
223
    }
224
225
    /**
226
     * {@inheritdoc}
227
     */
228
    public function initialize()
229
    {
230
    }
231
232
    /**
233
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
234
     *
235
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
236
     *
237
     * @return array
238
     */
239
    protected function getApiQueueStatus()
240
    {
241
        if (false === class_exists('Guzzle\Http\Client')) {
242
            throw new \RuntimeException(
243
                'The guzzle http client library is required to run rabbitmq health checks. '
244
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
245
            );
246
        }
247
248
        $client = new \Guzzle\Http\Client();
249
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
250
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
251
        $request->setAuth($this->settings['user'], $this->settings['pass']);
252
253
        return json_decode($request->send()->getBody(true), true);
254
    }
255
}
256