AMQPConnectionFactory::createConnection()   A
last analyzed

Complexity

Conditions 6
Paths 3

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 6

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 6
eloc 10
c 3
b 0
f 0
nc 3
nop 0
dl 0
loc 18
ccs 9
cts 9
cp 1
crap 6
rs 9.2222
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface;
6
use PhpAmqpLib\Connection\AbstractConnection;
7
use PhpAmqpLib\Connection\AMQPSocketConnection;
8
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
9
10
class AMQPConnectionFactory
11
{
12
    /** @var string */
13
    private $class;
14
15
    /** @var array */
16
    private $parameters = [
17
        'url'                => '',
18
        'host'               => 'localhost',
19
        'port'               => 5672,
20
        'user'               => 'guest',
21
        'password'           => 'guest',
22
        'vhost'              => '/',
23
        'connection_timeout' => 3,
24
        'read_write_timeout' => 3,
25
        'ssl_context'        => null,
26
        'keepalive'          => false,
27
        'heartbeat'          => 0,
28
        'hosts'              => [],
29
        'channel_rpc_timeout' => 0.0,
30
    ];
31
32
    /**
33
     * Constructor
34
     *
35
     * @param string                                $class              FQCN of AMQPConnection class to instantiate.
36
     * @param array                                 $parameters         Map containing parameters resolved by
37
     *                                                                  Extension.
38
     * @param ConnectionParametersProviderInterface $parametersProvider Optional service providing/overriding
39
     *                                                                  connection parameters.
40 14
     */
41
    public function __construct(
42
        $class,
43
        array $parameters,
44
        ?ConnectionParametersProviderInterface $parametersProvider = null
45 14
    ) {
46 14
        $this->class = $class;
47 14
        $this->parameters = array_merge($this->parameters, $parameters);
48
        $this->parameters = $this->parseUrl($this->parameters);
49 14
50 4
        foreach ($this->parameters['hosts'] as $key => $hostParameters) {
51 4
            if (!isset($hostParameters['url'])) {
52
                continue;
53
            }
54 3
55
            $this->parameters['hosts'][$key] = $this->parseUrl($hostParameters);
56
        }
57 14
58 2
        if ($parametersProvider) {
59 2
            $this->parameters = array_merge($this->parameters, $parametersProvider->getConnectionParameters());
60
        }
61
62 14
        if (is_array($this->parameters['ssl_context'])) {
63 2
            $this->parameters['context'] = !empty($this->parameters['ssl_context'])
64
                ? stream_context_create(['ssl' => $this->parameters['ssl_context']])
65 14
                : null;
66
        }
67
68
    }
69
70
    /**
71
     * Creates the appropriate connection using current parameters.
72
     *
73 14
     * @return AbstractConnection
74
     * @throws \Exception
75 14
     */
76 1
    public function createConnection()
77 1
    {
78
        if (isset($this->parameters['constructor_args']) && is_array($this->parameters['constructor_args'])) {
79
            $constructorArgs = array_values($this->parameters['constructor_args']);
80 13
            return new $this->class(...$constructorArgs);
81 13
        }
82 13
83
        $hosts = $this->parameters['hosts'] ?: [$this->parameters];
84 13
        $options = $this->parameters;
85 3
        unset($options['hosts']);
86 3
87
        if ($this->class == AMQPSocketConnection::class || is_subclass_of($this->class, AMQPSocketConnection::class)) {
88
            $options['read_timeout'] ??= $this->parameters['read_write_timeout'];
89
            $options['write_timeout'] ??= $this->parameters['read_write_timeout'];
90 13
        }
91
92
        // No need to unpack options, they will be handled inside connection classes
93
        return $this->class::create_connection($hosts, $options);
94
    }
95
96
    /**
97
     * Parses connection parameters from URL parameter.
98
     *
99
     * @param array $parameters
100 14
     *
101
     * @return array
102 14
     */
103 11
    private function parseUrl(array $parameters)
104
    {
105
        if (!$parameters['url']) {
106 6
            return $parameters;
107
        }
108 6
109
        $url = parse_url($parameters['url']);
110
111
        if ($url === false || !isset($url['scheme']) || !in_array($url['scheme'], ['amqp', 'amqps'], true)) {
112
            throw new InvalidConfigurationException('Malformed parameter "url".');
113 6
        }
114 6
115
        // See https://www.rabbitmq.com/uri-spec.html
116 6
        if (isset($url['host'])) {
117 6
            $parameters['host'] = urldecode($url['host']);
118
        }
119 6
        if (isset($url['port'])) {
120 6
            $parameters['port'] = (int)$url['port'];
121
        }
122 6
        if (isset($url['user'])) {
123 6
            $parameters['user'] = urldecode($url['user']);
124
        }
125 6
        if (isset($url['pass'])) {
126 6
            $parameters['password'] = urldecode($url['pass']);
127
        }
128
        if (isset($url['path'])) {
129 6
            $parameters['vhost'] = urldecode(ltrim($url['path'], '/'));
130 3
        }
131 3
132 3
        if (isset($url['query'])) {
133
            $query = [];
134
            parse_str($url['query'], $query);
135 6
            $parameters = array_merge($parameters, $query);
136
        }
137 6
138
        unset($parameters['url']);
139
140
        return $parameters;
141
    }
142
}
143