Completed
Pull Request — master (#276)
by Maksim
01:57
created

AMQPBackendDispatcher::getContext()   B

Complexity

Conditions 6
Paths 5

Size

Total Lines 38
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 38
rs 8.439
c 0
b 0
f 0
cc 6
eloc 22
nc 5
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Sonata Project package.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use Enqueue\AmqpTools\DelayStrategyAware;
15
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
16
use Interop\Amqp\AmqpContext;
17
use Interop\Amqp\AmqpConnectionFactory;
18
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
19
use Sonata\NotificationBundle\Model\MessageInterface;
20
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
21
use ZendDiagnostics\Result\Failure;
22
use ZendDiagnostics\Result\Success;
23
24
/**
25
 * Producer side of the rabbitmq backend.
26
 */
27
class AMQPBackendDispatcher extends QueueBackendDispatcher
28
{
29
    /**
30
     * @var array
31
     */
32
    protected $settings;
33
34
    /**
35
     * @var AmqpConnectionFactory
36
     */
37
    protected $connectionFactory;
38
39
    /**
40
     * @var AmqpContext
41
     */
42
    protected $context;
43
44
    protected $backendsInitialized = false;
45
46
    /**
47
     * @param array  $settings
48
     * @param array  $queues
49
     * @param string $defaultQueue
50
     * @param array  $backends
51
     */
52
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
53
    {
54
        parent::__construct($queues, $defaultQueue, $backends);
55
56
        $this->settings = $settings;
57
    }
58
59
    /**
60
     * @return AmqpContext
61
     */
62
    public function getContext()
63
    {
64
        if (!$this->context) {
65
            if (!array_key_exists('factory_class', $this->settings)) {
66
                throw new \LogicException('The factory_class option is missing though it is required.');
67
            }
68
69
            $factoryClass = $this->settings['factory_class'];
70
            if (
71
                !class_exists($factoryClass) ||
72
                !(new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)
73
            ) {
74
                throw new \LogicException(sprintf(
75
                    'The factory_class option has to be valid class that implements "%s"',
76
                    AmqpConnectionFactory::class)
77
                );
78
            }
79
80
            /** @var AmqpConnectionFactory $factory */
81
            $factory = new $factoryClass([
82
                'host' => $this->settings['host'],
83
                'port' => $this->settings['port'],
84
                'user' => $this->settings['user'],
85
                'pass' => $this->settings['pass'],
86
                'vhost' => $this->settings['vhost'],
87
            ]);
88
89
            if ($factory instanceof DelayStrategyAware) {
90
                $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
91
            }
92
93
            $this->context = $this->connectionFactory->createContext();
94
95
            register_shutdown_function([$this, 'shutdown']);
96
        }
97
98
        return $this->context;
99
    }
100
101
    /**
102
     * {@inheritdoc}
103
     */
104
    public function getBackend($type)
105
    {
106
        if (!$this->backendsInitialized) {
107
            foreach ($this->backends as $backend) {
108
                $backend['backend']->initialize();
109
            }
110
            $this->backendsInitialized = true;
111
        }
112
113
        $default = null;
114
115
        if (0 === count($this->queues)) {
116
            foreach ($this->backends as $backend) {
117
                if ('default' === $backend['type']) {
118
                    return $backend['backend'];
119
                }
120
            }
121
        }
122
123
        foreach ($this->backends as $backend) {
124
            if ('all' === $type && '' === $backend['type']) {
125
                return $backend['backend'];
126
            }
127
128
            if ($backend['type'] === $type) {
129
                return $backend['backend'];
130
            }
131
132
            if ($backend['type'] === $this->defaultQueue) {
133
                $default = $backend['backend'];
134
            }
135
        }
136
137
        if (null === $default) {
138
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
139
        }
140
141
        return $default;
142
    }
143
144
    /**
145
     * {@inheritdoc}
146
     */
147
    public function getIterator()
148
    {
149
        throw new \RuntimeException(
150
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
151
        );
152
    }
153
154
    /**
155
     * {@inheritdoc}
156
     */
157
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
158
    {
159
        throw new \RuntimeException(
160
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
161
        );
162
    }
163
164
    /**
165
     * {@inheritdoc}
166
     */
167
    public function getStatus()
168
    {
169
        try {
170
            $this->getContext();
171
            $output = $this->getApiQueueStatus();
172
            $checked = 0;
173
            $missingConsumers = [];
174
175
            foreach ($this->queues as $queue) {
176
                foreach ($output as $q) {
177
                    if ($q['name'] === $queue['queue']) {
178
                        ++$checked;
179
                        if (0 === $q['consumers']) {
180
                            $missingConsumers[] = $queue['queue'];
181
                        }
182
                    }
183
                }
184
            }
185
186
            if ($checked !== count($this->queues)) {
187
                return new Failure(
188
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
189
                    .'Are the consumer commands running?'
190
                );
191
            }
192
193
            if (count($missingConsumers) > 0) {
194
                return new Failure(
195
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
196
                );
197
            }
198
        } catch (\Exception $e) {
199
            return new Failure($e->getMessage());
200
        }
201
202
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
203
    }
204
205
    /**
206
     * {@inheritdoc}
207
     */
208
    public function cleanup()
209
    {
210
        throw new \RuntimeException(
211
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
212
        );
213
    }
214
215
    public function shutdown()
216
    {
217
        if ($this->context) {
218
            $this->context->close();
219
        }
220
    }
221
222
    /**
223
     * {@inheritdoc}
224
     */
225
    public function initialize()
226
    {
227
    }
228
229
    /**
230
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
231
     *
232
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
233
     *
234
     * @return array
235
     */
236
    protected function getApiQueueStatus()
237
    {
238
        if (!class_exists('Guzzle\Http\Client')) {
239
            throw new \RuntimeException(
240
                'The guzzle http client library is required to run rabbitmq health checks. '
241
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
242
            );
243
        }
244
245
        $client = new \Guzzle\Http\Client();
246
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
247
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
248
        $request->setAuth($this->settings['user'], $this->settings['pass']);
249
250
        return json_decode($request->send()->getBody(true), true);
251
    }
252
}
253