Completed
Push — master ( ece95b...9e5e0a )
by Mihai
20s queued 14s
created

AMQPConnectionFactory::createConnection()   A

Complexity

Conditions 6
Paths 3

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 6

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 6
eloc 10
c 3
b 0
f 0
nc 3
nop 0
dl 0
loc 18
ccs 13
cts 13
cp 1
crap 6
rs 9.2222
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface;
6
use PhpAmqpLib\Connection\AbstractConnection;
7
use PhpAmqpLib\Connection\AMQPSocketConnection;
8
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
9
10
class AMQPConnectionFactory
11
{
12
    /** @var string */
13
    private $class;
14
15
    /** @var array */
16
    private $parameters = array(
17
        'url'                => '',
18
        'host'               => 'localhost',
19
        'port'               => 5672,
20
        'user'               => 'guest',
21
        'password'           => 'guest',
22
        'vhost'              => '/',
23
        'connection_timeout' => 3,
24
        'read_write_timeout' => 3,
25
        'ssl_context'        => null,
26
        'keepalive'          => false,
27
        'heartbeat'          => 0,
28
        'hosts'              => []
29
    );
30
31
    /**
32
     * Constructor
33
     *
34
     * @param string                                $class              FQCN of AMQPConnection class to instantiate.
35
     * @param array                                 $parameters         Map containing parameters resolved by
36
     *                                                                  Extension.
37
     * @param ConnectionParametersProviderInterface $parametersProvider Optional service providing/overriding
38 10
     *                                                                  connection parameters.
39
     */
40
    public function __construct(
41
        $class,
42
        array $parameters,
43 10
        ConnectionParametersProviderInterface $parametersProvider = null
44 10
    ) {
45 10
        $this->class = $class;
46 10
        $this->parameters = array_merge($this->parameters, $parameters);
47 1
        $this->parameters = $this->parseUrl($this->parameters);
48 1
49
        foreach ($this->parameters['hosts'] as $key => $hostParameters) {
50
            if (!isset($hostParameters['url'])) {
51 10
                continue;
52 2
            }
53
54 10
            $this->parameters['hosts'][$key] = $this->parseUrl($hostParameters);
55
        }
56
57
        if (is_array($this->parameters['ssl_context'])) {
58
            $this->parameters['ssl_context'] = ! empty($this->parameters['ssl_context'])
59
                ? stream_context_create(array('ssl' => $this->parameters['ssl_context']))
60
                : null;
61 10
        }
62
        if ($parametersProvider) {
63 10
            $this->parameters = array_merge($this->parameters, $parametersProvider->getConnectionParameters());
64
        }
65 10
    }
66 1
67
    /**
68
     * Creates the appropriate connection using current parameters.
69 9
     *
70 2
     * @return AbstractConnection
71 2
     * @throws \Exception
72 2
     */
73 2
    public function createConnection()
74 2
    {
75 2
        if (isset($this->parameters['constructor_args']) && is_array($this->parameters['constructor_args'])) {
76
            $constructorArgs = array_values($this->parameters['constructor_args']);
77 2
            return new $this->class(...$constructorArgs);
78
        }
79 2
80 2
        $hosts = $this->parameters['hosts'] ?: [$this->parameters];
81 2
        $options = $this->parameters;
82 2
        unset($options['hosts']);
83 2
84
        if ($this->class == AMQPSocketConnection::class || is_subclass_of($this->class, AMQPSocketConnection::class)) {
85
            $options['read_timeout'] = $options['read_timeout'] ?? $this->parameters['read_write_timeout'];
86
            $options['write_timeout'] = $options['write_timeout'] ?? $this->parameters['read_write_timeout'];
87 7
        }
88 7
89 7
        // No need to unpack options, they will be handled inside connection classes
90 7
        return $this->class::create_connection($hosts, $options);
91 7
    }
92 7
93
    /**
94 7
     * Parses connection parameters from URL parameter.
95
     *
96 7
     * @param array $parameters
97 7
     *
98 7
     * @return array
99 7
     */
100 7
    private function parseUrl(array $parameters)
101 7
    {
102
        if (!$parameters['url']) {
103
            return $parameters;
104
        }
105
106
        $url = parse_url($parameters['url']);
107
108
        if ($url === false || !isset($url['scheme']) || !in_array($url['scheme'], ['amqp', 'amqps'], true)) {
109
            throw new InvalidConfigurationException('Malformed parameter "url".');
110
        }
111
112
        // See https://www.rabbitmq.com/uri-spec.html
113 10
        if (isset($url['host'])) {
114
            $parameters['host'] = urldecode($url['host']);
115 10
        }
116 7
        if (isset($url['port'])) {
117
            $parameters['port'] = (int)$url['port'];
118
        }
119 3
        if (isset($url['user'])) {
120
            $parameters['user'] = urldecode($url['user']);
121 3
        }
122
        if (isset($url['pass'])) {
123
            $parameters['password'] = urldecode($url['pass']);
124
        }
125
        if (isset($url['path'])) {
126 3
            $parameters['vhost'] = urldecode(ltrim($url['path'], '/'));
127 3
        }
128
129 3
        if (isset($url['query'])) {
130 3
            $query = array();
131
            parse_str($url['query'], $query);
132 3
            $parameters = array_merge($parameters, $query);
133 3
        }
134
135 3
        unset($parameters['url']);
136 3
137
        return $parameters;
138 3
    }
139
}
140