Passed
Push — master ( 9e5e0a...da4076 )
by Mihai
03:57 queued 01:37
created

AMQPConnectionFactory   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 128
Duplicated Lines 0 %

Test Coverage

Coverage 95.83%

Importance

Changes 3
Bugs 0 Features 0
Metric Value
eloc 59
c 3
b 0
f 0
dl 0
loc 128
ccs 46
cts 48
cp 0.9583
rs 10
wmc 23

3 Methods

Rating   Name   Duplication   Size   Complexity  
A createConnection() 0 18 6
A __construct() 0 24 6
B parseUrl() 0 38 11
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface;
6
use PhpAmqpLib\Connection\AbstractConnection;
7
use PhpAmqpLib\Connection\AMQPSocketConnection;
8
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
9
10
class AMQPConnectionFactory
11
{
12
    /** @var string */
13
    private $class;
14
15
    /** @var array */
16
    private $parameters = array(
17
        'url'                => '',
18
        'host'               => 'localhost',
19
        'port'               => 5672,
20
        'user'               => 'guest',
21
        'password'           => 'guest',
22
        'vhost'              => '/',
23
        'connection_timeout' => 3,
24
        'read_write_timeout' => 3,
25
        'ssl_context'        => null,
26
        'keepalive'          => false,
27
        'heartbeat'          => 0,
28
        'hosts'              => []
29
    );
30
31
    /**
32
     * Constructor
33
     *
34
     * @param string                                $class              FQCN of AMQPConnection class to instantiate.
35
     * @param array                                 $parameters         Map containing parameters resolved by
36
     *                                                                  Extension.
37
     * @param ConnectionParametersProviderInterface $parametersProvider Optional service providing/overriding
38
     *                                                                  connection parameters.
39
     */
40 14
    public function __construct(
41
        $class,
42
        array $parameters,
43
        ConnectionParametersProviderInterface $parametersProvider = null
44
    ) {
45 14
        $this->class = $class;
46 14
        $this->parameters = array_merge($this->parameters, $parameters);
47 14
        $this->parameters = $this->parseUrl($this->parameters);
48
49 14
        foreach ($this->parameters['hosts'] as $key => $hostParameters) {
50 4
            if (!isset($hostParameters['url'])) {
51 4
                continue;
52
            }
53
54 3
            $this->parameters['hosts'][$key] = $this->parseUrl($hostParameters);
55
        }
56
57 14
        if (is_array($this->parameters['ssl_context'])) {
58 2
            $this->parameters['context'] = ! empty($this->parameters['ssl_context'])
59 2
                ? stream_context_create(array('ssl' => $this->parameters['ssl_context']))
60
                : null;
61
        }
62 14
        if ($parametersProvider) {
63 2
            $this->parameters = array_merge($this->parameters, $parametersProvider->getConnectionParameters());
64
        }
65 14
    }
66
67
    /**
68
     * Creates the appropriate connection using current parameters.
69
     *
70
     * @return AbstractConnection
71
     * @throws \Exception
72
     */
73 14
    public function createConnection()
74
    {
75 14
        if (isset($this->parameters['constructor_args']) && is_array($this->parameters['constructor_args'])) {
76 1
            $constructorArgs = array_values($this->parameters['constructor_args']);
77 1
            return new $this->class(...$constructorArgs);
78
        }
79
80 13
        $hosts = $this->parameters['hosts'] ?: [$this->parameters];
81 13
        $options = $this->parameters;
82 13
        unset($options['hosts']);
83
84 13
        if ($this->class == AMQPSocketConnection::class || is_subclass_of($this->class, AMQPSocketConnection::class)) {
85 3
            $options['read_timeout'] = $options['read_timeout'] ?? $this->parameters['read_write_timeout'];
86 3
            $options['write_timeout'] = $options['write_timeout'] ?? $this->parameters['read_write_timeout'];
87
        }
88
89
        // No need to unpack options, they will be handled inside connection classes
90 13
        return $this->class::create_connection($hosts, $options);
91
    }
92
93
    /**
94
     * Parses connection parameters from URL parameter.
95
     *
96
     * @param array $parameters
97
     *
98
     * @return array
99
     */
100 14
    private function parseUrl(array $parameters)
101
    {
102 14
        if (!$parameters['url']) {
103 11
            return $parameters;
104
        }
105
106 6
        $url = parse_url($parameters['url']);
107
108 6
        if ($url === false || !isset($url['scheme']) || !in_array($url['scheme'], ['amqp', 'amqps'], true)) {
109
            throw new InvalidConfigurationException('Malformed parameter "url".');
110
        }
111
112
        // See https://www.rabbitmq.com/uri-spec.html
113 6
        if (isset($url['host'])) {
114 6
            $parameters['host'] = urldecode($url['host']);
115
        }
116 6
        if (isset($url['port'])) {
117 6
            $parameters['port'] = (int)$url['port'];
118
        }
119 6
        if (isset($url['user'])) {
120 6
            $parameters['user'] = urldecode($url['user']);
121
        }
122 6
        if (isset($url['pass'])) {
123 6
            $parameters['password'] = urldecode($url['pass']);
124
        }
125 6
        if (isset($url['path'])) {
126 6
            $parameters['vhost'] = urldecode(ltrim($url['path'], '/'));
127
        }
128
129 6
        if (isset($url['query'])) {
130 3
            $query = array();
131 3
            parse_str($url['query'], $query);
132 3
            $parameters = array_merge($parameters, $query);
133
        }
134
135 6
        unset($parameters['url']);
136
137 6
        return $parameters;
138
    }
139
}
140