Completed
Push — master ( ccbe64...bcccab )
by Thomas Mauro
05:56
created

Connection::createChannelResource()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
ccs 0
cts 0
cp 0
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
3
namespace AMQPAL\Adapter\AMQP;
4
5
use Traversable;
6
use AMQPConnection;
7
use AMQPAL\Adapter\Exception;
8
use AMQPAL\Adapter\ConnectionInterface;
9
use AMQPAL\Adapter\AMQP\Options\ConnectionOptions;
10
11
/**
12
 * Class Connection
13
 *
14
 * @package AMQPAL\Adapter\AMQP
15
 */
16
class Connection implements ConnectionInterface
17
{
18
    /**
19
     * @var AMQPConnection
20
     */
21
    protected $resource;
22
    /**
23
     * @var ConnectionOptions
24
     */
25
    protected $options;
26
    /**
27
     * @var Channel
28
     */
29
    protected $channelPrototype;
30
31
    /**
32
     * Connection constructor.
33
     *
34
     * @param AMQPConnection|ConnectionOptions $connection
35
     * @param Channel $channelPrototype
36
     * @throws Exception\BadMethodCallException
37
     * @throws Exception\InvalidArgumentException
38
     */
39 20
    public function __construct($connection, Channel $channelPrototype = null)
40
    {
41 20
        if (!$connection instanceof AMQPConnection) {
42 8
            $this->setOptions($connection);
43 8
            $connection = $this->createResource($this->getOptions());
44
        }
45
46 20
        $this->setResource($connection);
47 20
        $this->registerChannel($channelPrototype ?: new Channel());
48 20
    }
49
50
    /**
51
     * @return ConnectionOptions
52
     */
53 15
    public function getOptions()
54
    {
55 15
        return $this->options;
56
    }
57
58
    /**
59
     * @param ConnectionOptions|Traversable|array $options
60
     * @return $this
61
     * @throws Exception\InvalidArgumentException
62
     * @throws Exception\BadMethodCallException
63
     */
64 15
    public function setOptions($options)
65
    {
66 15
        if (!$options instanceof ConnectionOptions) {
67 8
            $options = new ConnectionOptions($options);
68
        }
69 15
        $this->options = $options;
70
71 15
        return $this;
72
    }
73
74
    /**
75
     * @param ConnectionOptions $options
76
     * @return AMQPConnection
77
     */
78 8
    protected function createResource(ConnectionOptions $options)
79
    {
80
        $params = [
81 8
            'host'  => $options->getHost(),
82 8
            'port'  => $options->getPort(),
83 8
            'vhost' => $options->getVhost(),
84 8
            'login' => $options->getUsername(),
85 8
            'password' => $options->getPassword(),
86 8
            'read_timeout'  => $options->getReadTimeout(),
87 8
            'write_timeout' => $options->getWriteTimeout(),
88 8
            'connect_timeout' => $options->getConnectTimeout(),
89 8
            'channel_max' => $options->getChannelMax(),
90 8
            'frame_max' => $options->getFrameMax(),
91 8
            'heartbeat' => $options->getHeartbeat()
92
        ];
93
94 8
        return new AMQPConnection(array_filter($params, [$this, 'filterConnectionParam']));
95
    }
96
97
    /**
98
     * @param mixed $paramValue
99
     * @return bool
100
     */
101 8
    protected function filterConnectionParam($paramValue)
102
    {
103 8
        return null !== $paramValue;
104
    }
105
106
    /**
107
     * @param AMQPConnection $resource
108
     * @return $this
109
     */
110 20
    public function setResource(AMQPConnection $resource)
111
    {
112 20
        $this->resource = $resource;
113 20
        return $this;
114
    }
115
116
    /**
117
     * @return AMQPConnection
118
     */
119 17
    public function getResource()
120
    {
121 17
        return $this->resource;
122
    }
123
124
    /**
125
     * Establish a connection with the AMQP broker.
126
     *
127
     * @return $this
128
     * @throws \AMQPConnectionException
129
     */
130 9
    public function connect()
131
    {
132 9
        if ($this->getOptions()->isPersistent()) {
133 1
            $this->getResource()->pconnect();
134
        } else {
135 8
            $this->getResource()->connect();
136
        }
137
138 9
        return $this;
139
    }
140
141
    /**
142
     * Close any open connections and initiate a new one with the AMQP broker.
143
     *
144
     * @return $this
145
     */
146 2
    public function reconnect()
147
    {
148 2
        if ($this->getOptions()->isPersistent()) {
149 1
            $this->getResource()->preconnect();
0 ignored issues
show
Bug introduced by
The method preconnect() does not exist on AMQPConnection. Did you maybe mean connect()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
150
        } else {
151 1
            $this->getResource()->reconnect();
152
        }
153
154 2
        return $this;
155
    }
156
157
    /**
158
     * Closes the connection with the AMQP broker.
159
     *
160
     * @return $this
161
     */
162 9
    public function disconnect()
163
    {
164 9
        if ($this->getOptions()->isPersistent()) {
165
            // @todo: should disconnect on persistent connection?
166 1
            $this->getResource()->pdisconnect();
167
        } else {
168 8
            $this->getResource()->disconnect();
169
        }
170
171 9
        return $this;
172
    }
173
174
    /**
175
     * Check whether the connection to the AMQP broker is still valid.
176
     *
177
     * @return bool
178
     */
179 10
    public function isConnected()
180
    {
181 10
        return $this->getResource()->isConnected();
182
    }
183
184
    /**
185
     * @param \AMQPChannel $resource
186
     * @return Channel
187
     * @throws \AMQPConnectionException
188
     */
189 10
    public function createChannel($resource = null)
190
    {
191 10
        $channel = clone $this->channelPrototype;
192
193 10
        $channel->setConnection($this);
194
195 10
        if ($resource instanceof \AMQPChannel) {
196 1
            $channel->setResource($resource);
197
        } else {
198 9
            if (!$this->isConnected()) {
199 1
                $this->connect();
200
            }
201 9
            $channel->setResource($this->createChannelResource());
202
        }
203
204 10
        return $channel;
205
    }
206
207
    /**
208
     * @param Channel $channel
209
     */
210 20
    public function registerChannel(Channel $channel)
211
    {
212 20
        $this->channelPrototype = $channel;
213 20
    }
214
215
    /**
216
     * @return \AMQPChannel
217
     * @throws \AMQPConnectionException
218
     * @codeCoverageIgnore
219
     */
220
    protected function createChannelResource()
221
    {
222
        return new \AMQPChannel($this->getResource());
223
    }
224
}
225