Completed
Push — master ( 980655...c21cce )
by Grégoire
01:53
created

AMQPBackendDispatcher::getChannel()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 21
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 21
rs 9.3142
c 0
b 0
f 0
cc 3
eloc 10
nc 3
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Sonata Project package.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use Enqueue\AmqpTools\DelayStrategyAware;
15
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
16
use Guzzle\Http\Client as GuzzleClient;
17
use Interop\Amqp\AmqpConnectionFactory;
18
use Interop\Amqp\AmqpContext;
19
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
20
use Sonata\NotificationBundle\Model\MessageInterface;
21
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
22
use ZendDiagnostics\Result\Failure;
23
use ZendDiagnostics\Result\Success;
24
25
/**
26
 * Producer side of the rabbitmq backend.
27
 */
28
final class AMQPBackendDispatcher extends QueueBackendDispatcher
29
{
30
    /**
31
     * @var array
32
     */
33
    private $settings;
34
35
    private $backendsInitialized = false;
36
37
    /**
38
     * @var AmqpConnectionFactory
39
     */
40
    private $connectionFactory;
41
42
    /**
43
     * @var AmqpContext
44
     */
45
    private $context;
46
47
    /**
48
     * @param array  $settings
49
     * @param array  $queues
50
     * @param string $defaultQueue
51
     * @param array  $backends
52
     */
53
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
54
    {
55
        parent::__construct($queues, $defaultQueue, $backends);
56
57
        $this->settings = $settings;
58
    }
59
60
    /**
61
     * @return AmqpContext
62
     */
63
    public function getContext()
64
    {
65
        if (!$this->context) {
66
            if (!array_key_exists('factory_class', $this->settings)) {
67
                throw new \LogicException('The factory_class option is missing though it is required.');
68
            }
69
            $factoryClass = $this->settings['factory_class'];
70
            if (
71
                !class_exists($factoryClass) ||
72
                !(new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)
73
            ) {
74
                throw new \LogicException(sprintf(
75
                    'The factory_class option "%s" has to be valid class that implements "%s"',
76
                    $factoryClass,
77
                    AmqpConnectionFactory::class
78
                ));
79
            }
80
81
            /* @var AmqpConnectionFactory $factory */
82
            $this->connectionFactory = $factory = new $factoryClass([
83
                'host' => $this->settings['host'],
84
                'port' => $this->settings['port'],
85
                'user' => $this->settings['user'],
86
                'pass' => $this->settings['pass'],
87
                'vhost' => $this->settings['vhost'],
88
            ]);
89
90
            if ($factory instanceof DelayStrategyAware) {
91
                $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
92
            }
93
94
            $this->context = $factory->createContext();
95
96
            register_shutdown_function([$this, 'shutdown']);
97
        }
98
99
        return $this->context;
100
    }
101
102
    /**
103
     * {@inheritdoc}
104
     */
105
    public function getBackend($type)
106
    {
107
        if (!$this->backendsInitialized) {
108
            foreach ($this->backends as $backend) {
109
                $backend['backend']->initialize();
110
            }
111
            $this->backendsInitialized = true;
112
        }
113
114
        $default = null;
115
116
        if (0 === count($this->queues)) {
117
            foreach ($this->backends as $backend) {
118
                if ('default' === $backend['type']) {
119
                    return $backend['backend'];
120
                }
121
            }
122
        }
123
124
        foreach ($this->backends as $backend) {
125
            if ('all' === $type && '' === $backend['type']) {
126
                return $backend['backend'];
127
            }
128
129
            if ($backend['type'] === $type) {
130
                return $backend['backend'];
131
            }
132
133
            if ($backend['type'] === $this->defaultQueue) {
134
                $default = $backend['backend'];
135
            }
136
        }
137
138
        if (null === $default) {
139
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
140
        }
141
142
        return $default;
143
    }
144
145
    /**
146
     * {@inheritdoc}
147
     */
148
    public function getIterator()
149
    {
150
        throw new \RuntimeException(
151
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
152
        );
153
    }
154
155
    /**
156
     * {@inheritdoc}
157
     */
158
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
159
    {
160
        throw new \RuntimeException(
161
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
162
        );
163
    }
164
165
    /**
166
     * {@inheritdoc}
167
     */
168
    public function getStatus()
169
    {
170
        try {
171
            $this->getContext();
172
            $output = $this->getApiQueueStatus();
173
            $checked = 0;
174
            $missingConsumers = [];
175
176
            foreach ($this->queues as $queue) {
177
                foreach ($output as $q) {
178
                    if ($q['name'] === $queue['queue']) {
179
                        ++$checked;
180
                        if (0 === $q['consumers']) {
181
                            $missingConsumers[] = $queue['queue'];
182
                        }
183
                    }
184
                }
185
            }
186
187
            if ($checked !== count($this->queues)) {
188
                return new Failure(
189
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
190
                    .'Are the consumer commands running?'
191
                );
192
            }
193
194
            if (count($missingConsumers) > 0) {
195
                return new Failure(
196
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
197
                );
198
            }
199
        } catch (\Exception $e) {
200
            return new Failure($e->getMessage());
201
        }
202
203
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
204
    }
205
206
    /**
207
     * {@inheritdoc}
208
     */
209
    public function cleanup()
210
    {
211
        throw new \RuntimeException(
212
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
213
        );
214
    }
215
216
    public function shutdown()
217
    {
218
        if ($this->context) {
219
            $this->context->close();
220
        }
221
    }
222
223
    /**
224
     * {@inheritdoc}
225
     */
226
    public function initialize()
227
    {
228
    }
229
230
    /**
231
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
232
     *
233
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
234
     *
235
     * @return array
236
     */
237
    protected function getApiQueueStatus()
238
    {
239
        if (!class_exists(GuzzleClient::class)) {
240
            throw new \RuntimeException(
241
                'The guzzle http client library is required to run rabbitmq health checks. '
242
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
243
            );
244
        }
245
246
        $client = new GuzzleClient();
247
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
248
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
249
        $request->setAuth($this->settings['user'], $this->settings['pass']);
250
251
        return json_decode($request->send()->getBody(true), true);
252
    }
253
}
254