Completed
Push — master ( ccbe64...bcccab )
by Thomas Mauro
05:56
created

Connection::createChannel()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 17
ccs 9
cts 9
cp 1
rs 9.4285
cc 3
eloc 10
nc 3
nop 1
crap 3
1
<?php
2
3
namespace AMQPAL\Adapter\PhpAmqpLib;
4
5
use AMQPAL\Adapter\Exception;
6
use AMQPAL\Adapter\ConnectionInterface;
7
use AMQPAL\Adapter\PhpAmqpLib\Options\ConnectionOptions;
8
use PhpAmqpLib\Connection\AbstractConnection;
9
use PhpAmqpLib\Channel\AMQPChannel as LibChannel;
10
use Traversable;
11
12
/**
13
 * Class Connection
14
 *
15
 * @package AMQPAL\Adapter\PhpAmqpLib
16
 */
17
class Connection implements ConnectionInterface
18
{
19
    /**
20
     * @var AbstractConnection
21
     */
22
    protected $resource;
23
    /**
24
     * @var ConnectionOptions
25
     */
26
    protected $options;
27
    /**
28
     * @var Channel
29
     */
30
    protected $channelPrototype;
31
32
    /**
33
     * Connection constructor.
34
     *
35
     * @param AbstractConnection|ConnectionOptions $connection
36
     * @param Channel $channelPrototype
37
     * @throws Exception\RuntimeException
38
     * @throws Exception\InvalidArgumentException
39
     * @throws Exception\BadMethodCallException
40
     */
41 18
    public function __construct($connection, Channel $channelPrototype = null)
42
    {
43 18
        if (!$connection instanceof AbstractConnection) {
44 8
            $this->setOptions($connection);
45 8
            $connection = $this->createResource();
46
        }
47
48 18
        $this->setResource($connection);
49 18
        $this->registerChannel($channelPrototype ?: new Channel());
50 18
    }
51
52
    /**
53
     * @return ConnectionOptions
54
     */
55 9
    public function getOptions()
56
    {
57 9
        return $this->options;
58
    }
59
60
    /**
61
     * @param ConnectionOptions|Traversable|array $options
62
     * @return $this
63
     * @throws \AMQPAL\Exception\InvalidArgumentException
64
     * @throws \AMQPAL\Exception\BadMethodCallException
65
     * @throws Exception\InvalidArgumentException
66
     * @throws Exception\BadMethodCallException
67
     */
68 10
    public function setOptions($options)
69
    {
70 10
        if (!$options instanceof ConnectionOptions) {
71 8
            $options = new ConnectionOptions($options);
72
        }
73 10
        $this->options = $options;
74
75 10
        return $this;
76
    }
77
78
    /**
79
     * @return AbstractConnection
80
     * @throws Exception\RuntimeException
81
     * @throws Exception\InvalidArgumentException
82
     */
83 8
    protected function createResource()
84
    {
85 8
        $factory = $this->getOptions()
86 8
            ->getConnectionFactoryFactory()
87 8
            ->createFactory($this->getOptions()->getType());
88 8
        return $factory->createConnection($this->getOptions());
89
    }
90
91
    /**
92
     * @param AbstractConnection $resource
93
     * @return $this
94
     */
95 18
    public function setResource(AbstractConnection $resource)
96
    {
97 18
        $this->resource = $resource;
98 18
        return $this;
99
    }
100
101
    /**
102
     * @return AbstractConnection
103
     */
104 10
    public function getResource()
105
    {
106 10
        return $this->resource;
107
    }
108
109
    /**
110
     * Establish a connection with the AMQP broker.
111
     *
112
     * @return $this
113
     * @throws Exception\RuntimeException
114
     * @throws Exception\InvalidArgumentException
115
     */
116 9
    public function connect()
117
    {
118 9
        if (!$this->resource->isConnected()) {
119 9
            $this->resource->reconnect();
120
        }
121
122 9
        return $this;
123
    }
124
125
    /**
126
     * Close any open connections and initiate a new one with the AMQP broker.
127
     *
128
     * @return $this
129
     */
130 1
    public function reconnect()
131
    {
132 1
        $this->resource->reconnect();
133
134 1
        return $this;
135
    }
136
137
    /**
138
     * Closes the connection with the AMQP broker.
139
     *
140
     * @return $this
141
     * @throws Exception\RuntimeException
142
     */
143 8
    public function disconnect()
144
    {
145 8
        $this->resource->close();
146
147 8
        return $this;
148
    }
149
150
    /**
151
     * Check whether the connection to the AMQP broker is still valid.
152
     *
153
     * @return bool
154
     * @throws Exception\RuntimeException
155
     */
156 10
    public function isConnected()
157
    {
158 10
        return $this->resource->isConnected();
159
    }
160
161
    /**
162
     * @param Channel $channel
163
     */
164 18
    public function registerChannel(Channel $channel)
165
    {
166 18
        $this->channelPrototype = $channel;
167 18
    }
168
169
    /**
170
     * @param LibChannel $resource
171
     * @return Channel
172
     * @throws Exception\RuntimeException
173
     * @throws Exception\InvalidArgumentException
174
     */
175 10
    public function createChannel($resource = null)
176
    {
177 10
        $channel = clone $this->channelPrototype;
178
179 10
        $channel->setConnection($this);
180
181 10
        if ($resource instanceof LibChannel) {
182 1
            $channel->setResource($resource);
183
        } else {
184 9
            if (!$this->isConnected()) {
185 1
                $this->connect();
186
            }
187 9
            $channel->setResource($this->getResource()->channel());
188
        }
189
190 10
        return $channel;
191
    }
192
}
193