Completed
Pull Request — 3.x (#375)
by
unknown
02:08
created

AMQPBackendDispatcher::getChannel()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 21
rs 9.584
c 0
b 0
f 0
cc 3
nc 3
nop 0
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Sonata Project package.
7
 *
8
 * (c) Thomas Rabaix <[email protected]>
9
 *
10
 * For the full copyright and license information, please view the LICENSE
11
 * file that was distributed with this source code.
12
 */
13
14
namespace Sonata\NotificationBundle\Backend;
15
16
use Enqueue\AmqpTools\DelayStrategyAware;
17
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
18
use Guzzle\Http\Client as GuzzleClient;
19
use Interop\Amqp\AmqpConnectionFactory;
20
use Interop\Amqp\AmqpContext;
21
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
22
use Sonata\NotificationBundle\Model\MessageInterface;
23
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
24
use ZendDiagnostics\Result\Failure;
25
use ZendDiagnostics\Result\Success;
26
27
/**
28
 * Producer side of the rabbitmq backend.
29
 */
30
final class AMQPBackendDispatcher extends QueueBackendDispatcher
31
{
32
    /**
33
     * @var array
34
     */
35
    private $settings;
36
37
    private $backendsInitialized = false;
38
39
    /**
40
     * @var AmqpConnectionFactory
41
     */
42
    private $connectionFactory;
43
44
    /**
45
     * @var AmqpContext
46
     */
47
    private $context;
48
49
    /**
50
     * @param array  $settings
51
     * @param array  $queues
52
     * @param string $defaultQueue
53
     * @param array  $backends
54
     */
55
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
56
    {
57
        parent::__construct($queues, $defaultQueue, $backends);
58
59
        $this->settings = $settings;
60
    }
61
62
    /**
63
     * @return AmqpContext
64
     */
65
    public function getContext()
66
    {
67
        if (!$this->context) {
68
            if (!\array_key_exists('factory_class', $this->settings)) {
69
                throw new \LogicException('The factory_class option is missing though it is required.');
70
            }
71
            $factoryClass = $this->settings['factory_class'];
72
            if (
73
                !class_exists($factoryClass) ||
74
                !(new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)
75
            ) {
76
                throw new \LogicException(sprintf(
77
                    'The factory_class option "%s" has to be valid class that implements "%s"',
78
                    $factoryClass,
79
                    AmqpConnectionFactory::class
80
                ));
81
            }
82
83
            /* @var AmqpConnectionFactory $factory */
84
            $this->connectionFactory = $factory = new $factoryClass([
85
                'host' => $this->settings['host'],
86
                'port' => $this->settings['port'],
87
                'user' => $this->settings['user'],
88
                'pass' => $this->settings['pass'],
89
                'vhost' => $this->settings['vhost'],
90
            ]);
91
92
            if ($factory instanceof DelayStrategyAware) {
93
                $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
94
            }
95
96
            $this->context = $factory->createContext();
97
98
            register_shutdown_function([$this, 'shutdown']);
99
        }
100
101
        return $this->context;
102
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107
    public function getBackend($type)
108
    {
109
        if (!$this->backendsInitialized) {
110
            foreach ($this->backends as $backend) {
111
                $backend['backend']->initialize();
112
            }
113
            $this->backendsInitialized = true;
114
        }
115
116
        $default = null;
117
118
        if (0 === \count($this->queues)) {
119
            foreach ($this->backends as $backend) {
120
                if ('default' === $backend['type']) {
121
                    return $backend['backend'];
122
                }
123
            }
124
        }
125
126
        foreach ($this->backends as $backend) {
127
            if ('all' === $type && '' === $backend['type']) {
128
                return $backend['backend'];
129
            }
130
131
            if ($backend['type'] === $type) {
132
                return $backend['backend'];
133
            }
134
135
            if ($backend['type'] === $this->defaultQueue) {
136
                $default = $backend['backend'];
137
            }
138
        }
139
140
        if (null === $default) {
141
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
142
        }
143
144
        return $default;
145
    }
146
147
    /**
148
     * {@inheritdoc}
149
     */
150
    public function getIterator(): void
151
    {
152
        throw new \RuntimeException(
153
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
154
        );
155
    }
156
157
    /**
158
     * {@inheritdoc}
159
     */
160
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher): void
161
    {
162
        throw new \RuntimeException(
163
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
164
        );
165
    }
166
167
    /**
168
     * {@inheritdoc}
169
     */
170
    public function getStatus()
171
    {
172
        try {
173
            $this->getContext();
174
            $output = $this->getApiQueueStatus();
175
            $checked = 0;
176
            $missingConsumers = [];
177
178
            foreach ($this->queues as $queue) {
179
                foreach ($output as $q) {
180
                    if ($q['name'] === $queue['queue']) {
181
                        ++$checked;
182
                        if (0 === $q['consumers']) {
183
                            $missingConsumers[] = $queue['queue'];
184
                        }
185
                    }
186
                }
187
            }
188
189
            if ($checked !== \count($this->queues)) {
190
                return new Failure(
191
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
192
                    .'Are the consumer commands running?'
193
                );
194
            }
195
196
            if (\count($missingConsumers) > 0) {
197
                return new Failure(
198
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
199
                );
200
            }
201
        } catch (\Exception $e) {
202
            return new Failure($e->getMessage());
203
        }
204
205
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
206
    }
207
208
    /**
209
     * {@inheritdoc}
210
     */
211
    public function cleanup(): void
212
    {
213
        throw new \RuntimeException(
214
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
215
        );
216
    }
217
218
    public function shutdown(): void
219
    {
220
        if ($this->context) {
221
            $this->context->close();
222
        }
223
    }
224
225
    /**
226
     * {@inheritdoc}
227
     */
228
    public function initialize(): void
229
    {
230
    }
231
232
    /**
233
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
234
     *
235
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
236
     *
237
     * @return array
238
     */
239
    protected function getApiQueueStatus()
240
    {
241
        if (!class_exists(GuzzleClient::class)) {
242
            throw new \RuntimeException(
243
                'The guzzle http client library is required to run rabbitmq health checks. '
244
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
245
            );
246
        }
247
248
        $client = new GuzzleClient();
249
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
250
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
251
        $request->setAuth($this->settings['user'], $this->settings['pass']);
252
253
        return json_decode($request->send()->getBody(true), true);
254
    }
255
}
256