AMQPConnectionFactory::createConnection()   B
last analyzed

Complexity

Conditions 7
Paths 3

Size

Total Lines 41
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 30
CRAP Score 7

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 7
eloc 34
c 3
b 0
f 0
nc 3
nop 0
dl 0
loc 41
ccs 30
cts 30
cp 1
crap 7
rs 8.4426
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface;
6
use PhpAmqpLib\Connection\AbstractConnection;
7
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
8
9
class AMQPConnectionFactory
10
{
11
    /** @var string */
12
    private $class;
13
14
    /** @var array */
15
    private $parameters = array(
16
        'url'                => '',
17
        'host'               => 'localhost',
18
        'port'               => 5672,
19
        'user'               => 'guest',
20
        'password'           => 'guest',
21
        'vhost'              => '/',
22
        'connection_timeout' => 3,
23
        'read_write_timeout' => 3,
24
        'ssl_context'        => null,
25
        'keepalive'          => false,
26
        'heartbeat'          => 0,
27
    );
28
29
    /**
30
     * Constructor
31
     *
32
     * @param string                                $class              FQCN of AMQPConnection class to instantiate.
33
     * @param array                                 $parameters         Map containing parameters resolved by
34
     *                                                                  Extension.
35
     * @param ConnectionParametersProviderInterface $parametersProvider Optional service providing/overriding
36
     *                                                                  connection parameters.
37
     */
38 10
    public function __construct(
39
        $class,
40
        array $parameters,
41
        ConnectionParametersProviderInterface $parametersProvider = null
42
    ) {
43 10
        $this->class = $class;
44 10
        $this->parameters = array_merge($this->parameters, $parameters);
45 10
        $this->parameters = $this->parseUrl($this->parameters);
46 10
        if (is_array($this->parameters['ssl_context'])) {
47 1
            $this->parameters['ssl_context'] = ! empty($this->parameters['ssl_context'])
48 1
                ? stream_context_create(array('ssl' => $this->parameters['ssl_context']))
49
                : null;
50
        }
51 10
        if ($parametersProvider) {
52 2
            $this->parameters = array_merge($this->parameters, $parametersProvider->getConnectionParameters());
53
        }
54 10
    }
55
56
    /**
57
     * Creates the appropriate connection using current parameters.
58
     *
59
     * @return AbstractConnection
60
     */
61 10
    public function createConnection()
62
    {
63 10
        $ref = new \ReflectionClass($this->class);
64
65 10
        if (isset($this->parameters['constructor_args']) && is_array($this->parameters['constructor_args'])) {
66 1
            return $ref->newInstanceArgs($this->parameters['constructor_args']);
67
        }
68
69 9
        if ($this->class == 'PhpAmqpLib\Connection\AMQPSocketConnection' || is_subclass_of($this->class, 'PhpAmqpLib\Connection\AMQPSocketConnection')) {
70 2
            return $ref->newInstanceArgs([
71 2
                    $this->parameters['host'],
72 2
                    $this->parameters['port'],
73 2
                    $this->parameters['user'],
74 2
                    $this->parameters['password'],
75 2
                    $this->parameters['vhost'],
76
                    false,      // insist
77 2
                    'AMQPLAIN', // login_method
78
                    null,       // login_response
79 2
                    'en_US',    // locale
80 2
                    isset($this->parameters['read_timeout']) ? $this->parameters['read_timeout'] : $this->parameters['read_write_timeout'],
81 2
                    $this->parameters['keepalive'],
82 2
                    isset($this->parameters['write_timeout']) ? $this->parameters['write_timeout'] : $this->parameters['read_write_timeout'],
83 2
                    $this->parameters['heartbeat']
84
                ]
85
            );
86
        } else {
87 7
            return $ref->newInstanceArgs([
88 7
                $this->parameters['host'],
89 7
                $this->parameters['port'],
90 7
                $this->parameters['user'],
91 7
                $this->parameters['password'],
92 7
                $this->parameters['vhost'],
93
                false,      // insist
94 7
                'AMQPLAIN', // login_method
95
                null,       // login_response
96 7
                'en_US',    // locale
97 7
                $this->parameters['connection_timeout'],
98 7
                $this->parameters['read_write_timeout'],
99 7
                $this->parameters['ssl_context'],
100 7
                $this->parameters['keepalive'],
101 7
                $this->parameters['heartbeat']
102
            ]);
103
        }
104
    }
105
106
    /**
107
     * Parses connection parameters from URL parameter.
108
     *
109
     * @param array $parameters
110
     *
111
     * @return array
112
     */
113 10
    private function parseUrl(array $parameters)
114
    {
115 10
        if (!$parameters['url']) {
116 7
            return $parameters;
117
        }
118
119 3
        $url = parse_url($parameters['url']);
120
121 3
        if ($url === false || !isset($url['scheme']) || !in_array($url['scheme'], ['amqp', 'amqps'], true)) {
122
            throw new InvalidConfigurationException('Malformed parameter "url".');
123
        }
124
125
        // See https://www.rabbitmq.com/uri-spec.html
126 3
        if (isset($url['host'])) {
127 3
            $parameters['host'] = urldecode($url['host']);
128
        }
129 3
        if (isset($url['port'])) {
130 3
            $parameters['port'] = (int)$url['port'];
131
        }
132 3
        if (isset($url['user'])) {
133 3
            $parameters['user'] = urldecode($url['user']);
134
        }
135 3
        if (isset($url['pass'])) {
136 3
            $parameters['password'] = urldecode($url['pass']);
137
        }
138 3
        if (isset($url['path'])) {
139 3
            $parameters['vhost'] = urldecode(ltrim($url['path'], '/'));
140
        }
141
142 3
        if (isset($url['query'])) {
143 3
            $query = array();
144 3
            parse_str($url['query'], $query);
145 3
            $parameters = array_merge($parameters, $query);
146
        }
147
148 3
        unset($parameters['url']);
149
150 3
        return $parameters;
151
    }
152
}
153