AMQPBackendDispatcher::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 4
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Sonata Project package.
7
 *
8
 * (c) Thomas Rabaix <[email protected]>
9
 *
10
 * For the full copyright and license information, please view the LICENSE
11
 * file that was distributed with this source code.
12
 */
13
14
namespace Sonata\NotificationBundle\Backend;
15
16
use Enqueue\AmqpTools\DelayStrategyAware;
17
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
18
use Guzzle\Http\Client as GuzzleClient;
19
use Interop\Amqp\AmqpConnectionFactory;
20
use Interop\Amqp\AmqpContext;
21
use Laminas\Diagnostics\Result\Failure;
22
use Laminas\Diagnostics\Result\Success;
23
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
24
use Sonata\NotificationBundle\Model\MessageInterface;
25
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
26
27
/**
28
 * Producer side of the rabbitmq backend.
29
 */
30
final class AMQPBackendDispatcher extends QueueBackendDispatcher
31
{
32
    /**
33
     * @var array
34
     */
35
    private $settings;
36
37
    private $backendsInitialized = false;
38
39
    /**
40
     * @var AmqpConnectionFactory
41
     */
42
    private $connectionFactory;
43
44
    /**
45
     * @var AmqpContext
46
     */
47
    private $context;
48
49
    /**
50
     * @param string $defaultQueue
51
     */
52
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
53
    {
54
        parent::__construct($queues, $defaultQueue, $backends);
55
56
        $this->settings = $settings;
57
    }
58
59
    /**
60
     * @return AmqpContext
61
     */
62
    public function getContext()
63
    {
64
        if (!$this->context) {
65
            if (!\array_key_exists('factory_class', $this->settings)) {
66
                throw new \LogicException('The factory_class option is missing though it is required.');
67
            }
68
            $factoryClass = $this->settings['factory_class'];
69
            if (
70
                !class_exists($factoryClass) ||
71
                !(new \ReflectionClass($factoryClass))->implementsInterface(AmqpConnectionFactory::class)
72
            ) {
73
                throw new \LogicException(sprintf(
74
                    'The factory_class option "%s" has to be valid class that implements "%s"',
75
                    $factoryClass,
76
                    AmqpConnectionFactory::class
77
                ));
78
            }
79
80
            /* @var AmqpConnectionFactory $factory */
81
            $this->connectionFactory = $factory = new $factoryClass([
82
                'host' => $this->settings['host'],
83
                'port' => $this->settings['port'],
84
                'user' => $this->settings['user'],
85
                'pass' => $this->settings['pass'],
86
                'vhost' => $this->settings['vhost'],
87
            ]);
88
89
            if ($factory instanceof DelayStrategyAware) {
90
                $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
91
            }
92
93
            $this->context = $factory->createContext();
94
95
            register_shutdown_function([$this, 'shutdown']);
96
        }
97
98
        return $this->context;
99
    }
100
101
    /**
102
     * {@inheritdoc}
103
     */
104
    public function getBackend($type)
105
    {
106
        if (!$this->backendsInitialized) {
107
            foreach ($this->backends as $backend) {
108
                $backend['backend']->initialize();
109
            }
110
            $this->backendsInitialized = true;
111
        }
112
113
        $default = null;
114
115
        if (0 === \count($this->queues)) {
116
            foreach ($this->backends as $backend) {
117
                if ('default' === $backend['type']) {
118
                    return $backend['backend'];
119
                }
120
            }
121
        }
122
123
        foreach ($this->backends as $backend) {
124
            if ('all' === $type && '' === $backend['type']) {
125
                return $backend['backend'];
126
            }
127
128
            if ($backend['type'] === $type) {
129
                return $backend['backend'];
130
            }
131
132
            if ($backend['type'] === $this->defaultQueue) {
133
                $default = $backend['backend'];
134
            }
135
        }
136
137
        if (null === $default) {
138
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
139
        }
140
141
        return $default;
142
    }
143
144
    /**
145
     * {@inheritdoc}
146
     */
147
    public function getIterator(): void
148
    {
149
        throw new \RuntimeException(
150
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
151
        );
152
    }
153
154
    /**
155
     * {@inheritdoc}
156
     */
157
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher): void
158
    {
159
        throw new \RuntimeException(
160
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
161
        );
162
    }
163
164
    /**
165
     * {@inheritdoc}
166
     */
167
    public function getStatus()
168
    {
169
        try {
170
            $this->getContext();
171
            $output = $this->getApiQueueStatus();
172
            $checked = 0;
173
            $missingConsumers = [];
174
175
            foreach ($this->queues as $queue) {
176
                foreach ($output as $q) {
177
                    if ($q['name'] === $queue['queue']) {
178
                        ++$checked;
179
                        if (0 === $q['consumers']) {
180
                            $missingConsumers[] = $queue['queue'];
181
                        }
182
                    }
183
                }
184
            }
185
186
            if ($checked !== \count($this->queues)) {
187
                return new Failure(
188
                    'Not all queues for the available notification types registered in the rabbitmq broker. '
189
                    .'Are the consumer commands running?'
190
                );
191
            }
192
193
            if (\count($missingConsumers) > 0) {
194
                return new Failure(
195
                    'There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers)
196
                );
197
            }
198
        } catch (\Exception $e) {
199
            return new Failure($e->getMessage());
200
        }
201
202
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
203
    }
204
205
    /**
206
     * {@inheritdoc}
207
     */
208
    public function cleanup(): void
209
    {
210
        throw new \RuntimeException(
211
            'You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.'
212
        );
213
    }
214
215
    public function shutdown(): void
216
    {
217
        if ($this->context) {
218
            $this->context->close();
219
        }
220
    }
221
222
    /**
223
     * {@inheritdoc}
224
     */
225
    public function initialize(): void
226
    {
227
    }
228
229
    /**
230
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
231
     *
232
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
233
     *
234
     * @return array
235
     */
236
    protected function getApiQueueStatus()
237
    {
238
        if (!class_exists(GuzzleClient::class)) {
239
            throw new \RuntimeException(
240
                'The guzzle http client library is required to run rabbitmq health checks. '
241
                .'Make sure to add guzzlehttp/guzzle to your composer.json.'
242
            );
243
        }
244
245
        $client = new GuzzleClient();
246
        $client->setConfig(['curl.options' => [CURLOPT_CONNECTTIMEOUT_MS => 3000]]);
247
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
248
        $request->setAuth($this->settings['user'], $this->settings['pass']);
249
250
        return json_decode($request->send()->getBody(true), true);
251
    }
252
}
253