RabbitMQConnector::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Noitran\Lumen\Horizon\Connectors;
4
5
use Noitran\Lumen\Horizon\RabbitMQQueue;
6
use Enqueue\AmqpTools\DelayStrategyAware;
7
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
8
use Illuminate\Contracts\Events\Dispatcher;
9
use Illuminate\Contracts\Queue\Queue;
10
use Illuminate\Queue\Events\WorkerStopping;
11
use Interop\Amqp\AmqpConnectionFactory;
12
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;
13
use Interop\Amqp\AmqpContext;
14
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector as BaseConnector;
15
use ReflectionClass;
16
use LogicException;
17
18
class RabbitMQConnector extends BaseConnector
19
{
20
    /**
21
     * @var Dispatcher
22
     */
23
    protected $dispatcher;
24
25
    /**
26
     * RabbitMQConnector constructor.
27
     *
28
     * @param Dispatcher $dispatcher
29
     */
30
    public function __construct(Dispatcher $dispatcher)
31
    {
32
        parent::__construct($dispatcher);
33
34
        $this->dispatcher = $dispatcher;
35
    }
36
37
    /**
38
     * {@inheritdoc}
39
     */
40
    public function connect(array $config): Queue
41
    {
42
        if (false === array_key_exists('factory_class', $config)) {
43
            throw new LogicException('The factory_class option is missing though it is required.');
44
        }
45
46
        $factoryClass = $config['factory_class'];
47
48
        if (false === class_exists($factoryClass) ||
49
            false === (new ReflectionClass($factoryClass))->implementsInterface(InteropAmqpConnectionFactory::class)) {
50
            throw new LogicException(
51
                sprintf(
52
                    'The factory_class option has to be valid class that implements "%s"',
53
                    InteropAmqpConnectionFactory::class
54
                )
55
            );
56
        }
57
58
        /** @var AmqpConnectionFactory $factory */
59
        $factory = new $factoryClass([
60
            'dsn' => $config['dsn'],
61
            'host' => $config['host'],
62
            'port' => $config['port'],
63
            'user' => $config['login'],
64
            'pass' => $config['password'],
65
            'vhost' => $config['vhost'],
66
            'ssl_on' => $config['ssl_params']['ssl_on'],
67
            'ssl_verify' => $config['ssl_params']['verify_peer'],
68
            'ssl_cacert' => $config['ssl_params']['cafile'],
69
            'ssl_cert' => $config['ssl_params']['local_cert'],
70
            'ssl_key' => $config['ssl_params']['local_key'],
71
            'ssl_passphrase' => $config['ssl_params']['passphrase'],
72
        ]);
73
74
        if ($factory instanceof DelayStrategyAware) {
75
            $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
76
        }
77
78
        /** @var AmqpContext $context */
79
        $context = $factory->createContext();
80
81
        $this->dispatcher->listen(WorkerStopping::class, function () use ($context): void {
82
            $context->close();
83
        });
84
85
        return new RabbitMQQueue($context, $config);
86
    }
87
}
88