Completed
Pull Request — master (#276)
by Maksim
04:40
created

AMQPBackendDispatcher::getContext()   B

Complexity

Conditions 6
Paths 5

Size

Total Lines 32
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 32
rs 8.439
c 0
b 0
f 0
cc 6
eloc 18
nc 5
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Sonata Project package.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use Enqueue\AmqpTools\DelayStrategyAware;
15
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
16
use Interop\Amqp\AmqpContext;
17
use Interop\Amqp\AmqpConnectionFactory;
18
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
19
use Sonata\NotificationBundle\Model\MessageInterface;
20
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
21
use ZendDiagnostics\Result\Failure;
22
use ZendDiagnostics\Result\Success;
23
24
/**
25
 * Producer side of the rabbitmq backend.
26
 */
27
class AMQPBackendDispatcher extends QueueBackendDispatcher
28
{
29
    /**
30
     * @var array
31
     */
32
    protected $settings;
33
34
    /**
35
     * @var AmqpConnectionFactory
36
     */
37
    protected $connectionFactory;
38
39
    /**
40
     * @var AmqpContext
41
     */
42
    protected $context;
43
44
    protected $backendsInitialized = false;
45
46
    /**
47
     * @param array  $settings
48
     * @param array  $queues
49
     * @param string $defaultQueue
50
     * @param array  $backends
51
     */
52
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
53
    {
54
        parent::__construct($queues, $defaultQueue, $backends);
55
56
        $this->settings = $settings;
57
    }
58
59
    /**
60
     * @return AmqpContext
61
     */
62
    public function getContext()
63
    {
64
        if (!$this->context) {
65
            if (false == array_key_exists('factory_class', $this->settings)) {
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
66
                throw new \LogicException('The factory_class option is missing though it is required.');
67
            }
68
69
            $factoryClass = $this->settings['factory_class'];
70
            if (false == class_exists($factoryClass) || false == (new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)) {
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
71
                throw new \LogicException(sprintf('The factory_class option has to be valid class that implements "%s"', AmqpConnectionFactory::class));
72
            }
73
74
            /** @var AmqpConnectionFactory $factory */
75
            $factory = new $factoryClass([
76
                'host' => $this->settings['host'],
77
                'port' => $this->settings['port'],
78
                'user' => $this->settings['user'],
79
                'pass' => $this->settings['pass'],
80
                'vhost' => $this->settings['vhost'],
81
            ]);
82
83
            if ($factory instanceof DelayStrategyAware) {
84
                $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
85
            }
86
87
            $this->context = $this->connectionFactory->createContext();
88
89
            register_shutdown_function([$this, 'shutdown']);
90
        }
91
92
        return $this->context;
93
    }
94
95
    /**
96
     * {@inheritdoc}
97
     */
98
    public function getBackend($type)
99
    {
100
        if (!$this->backendsInitialized) {
101
            foreach ($this->backends as $backend) {
102
                $backend['backend']->initialize();
103
            }
104
            $this->backendsInitialized = true;
105
        }
106
107
        $default = null;
108
109
        if (0 === count($this->queues)) {
110
            foreach ($this->backends as $backend) {
111
                if ('default' === $backend['type']) {
112
                    return $backend['backend'];
113
                }
114
            }
115
        }
116
117
        foreach ($this->backends as $backend) {
118
            if ('all' === $type && '' === $backend['type']) {
119
                return $backend['backend'];
120
            }
121
122
            if ($backend['type'] === $type) {
123
                return $backend['backend'];
124
            }
125
126
            if ($backend['type'] === $this->defaultQueue) {
127
                $default = $backend['backend'];
128
            }
129
        }
130
131
        if (null === $default) {
132
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
133
        }
134
135
        return $default;
136
    }
137
138
    /**
139
     * {@inheritdoc}
140
     */
141
    public function getIterator()
142
    {
143
        throw new \RuntimeException(
144
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
145
        );
146
    }
147
148
    /**
149
     * {@inheritdoc}
150
     */
151
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
152
    {
153
        throw new \RuntimeException(
154
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
155
        );
156
    }
157
158
    /**
159
     * {@inheritdoc}
160
     */
161
    public function getStatus()
162
    {
163
        try {
164
            $this->getContext();
165
            $output = $this->getApiQueueStatus();
166
            $checked = 0;
167
            $missingConsumers = [];
168
169
            foreach ($this->queues as $queue) {
170
                foreach ($output as $q) {
171
                    if ($q['name'] === $queue['queue']) {
172
                        ++$checked;
173
                        if (0 === $q['consumers']) {
174
                            $missingConsumers[] = $queue['queue'];
175
                        }
176
                    }
177
                }
178
            }
179
180
            if ($checked !== count($this->queues)) {
181
                return new Failure(
182
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
183
                    .'Are the consumer commands running?'
184
                );
185
            }
186
187
            if (count($missingConsumers) > 0) {
188
                return new Failure(
189
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
190
                );
191
            }
192
        } catch (\Exception $e) {
193
            return new Failure($e->getMessage());
194
        }
195
196
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
197
    }
198
199
    /**
200
     * {@inheritdoc}
201
     */
202
    public function cleanup()
203
    {
204
        throw new \RuntimeException(
205
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
206
        );
207
    }
208
209
    public function shutdown()
210
    {
211
        if ($this->context) {
212
            $this->context->close();
213
        }
214
    }
215
216
    /**
217
     * {@inheritdoc}
218
     */
219
    public function initialize()
220
    {
221
    }
222
223
    /**
224
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
225
     *
226
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
227
     *
228
     * @return array
229
     */
230
    protected function getApiQueueStatus()
231
    {
232
        if (false === class_exists('Guzzle\Http\Client')) {
233
            throw new \RuntimeException(
234
                'The guzzle http client library is required to run rabbitmq health checks. '
235
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
236
            );
237
        }
238
239
        $client = new \Guzzle\Http\Client();
240
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
241
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
242
        $request->setAuth($this->settings['user'], $this->settings['pass']);
243
244
        return json_decode($request->send()->getBody(true), true);
245
    }
246
}
247