Completed
Push — master ( f55883...4285c1 )
by Thomas Mauro
13:12 queued 10:06
created

Channel::getConnection()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
namespace AMQPAL\Adapter\AMQP;
4
5
use AMQPAL\Adapter\ChannelInterface;
6
use AMQPAL\Options;
7
8
/**
9
 * Class Channel
10
 *
11
 * @package AMQPAL\Adapter\AMQP
12
 */
13
class Channel implements ChannelInterface
14
{
15
    /**
16
     * @var \AMQPChannel
17
     */
18
    protected $resource;
19
    /**
20
     * @var Connection
21
     */
22
    protected $connection;
23
    /**
24
     * @var Exchange
25
     */
26
    protected $exchangePrototype;
27
    /**
28
     * @var Queue
29
     */
30
    protected $queuePrototype;
31
32
    /**
33
     * Channel constructor.
34
     *
35
     * @param Exchange $exchangePrototype
36
     * @param Queue    $queuePrototype
37
     */
38 15
    public function __construct(Exchange $exchangePrototype = null, Queue $queuePrototype = null)
39
    {
40 15
        $this->registerExchange($exchangePrototype ?: new Exchange());
41 15
        $this->registerQueue($queuePrototype ?: new Queue());
42 15
    }
43
44
    /**
45
     * @param Exchange $exchange
46
     */
47 15
    public function registerExchange(Exchange $exchange)
48
    {
49 15
        $this->exchangePrototype = $exchange;
50 15
    }
51
52
    /**
53
     * @param Queue $queue
54
     */
55 15
    public function registerQueue(Queue $queue)
56
    {
57 15
        $this->queuePrototype = $queue;
58 15
    }
59
60
    /**
61
     * @return \AMQPChannel
62
     */
63 9
    public function getResource()
64
    {
65 9
        return $this->resource;
66
    }
67
68
    /**
69
     * @param \AMQPChannel $resource
70
     * @return $this
71
     */
72 9
    public function setResource(\AMQPChannel $resource)
73
    {
74 9
        $this->resource = $resource;
75
76 9
        return $this;
77
    }
78
79
    /**
80
     * Check the channel connection.
81
     *
82
     * @return bool Indicates whether the channel is connected.
83
     */
84 1
    public function isConnected()
85
    {
86 1
        return $this->getResource()->isConnected();
87
    }
88
89
    /**
90
     * Return internal channel ID
91
     *
92
     * @return integer
93
     */
94 1
    public function getChannelId()
95
    {
96 1
        return $this->getResource()->getChannelId();
97
    }
98
99
    /**
100
     * Set the window size and the number of messages to prefetch from the broker.
101
     *
102
     * @param int $prefetchSize  The window size, in octets, to prefetch.
103
     * @param int $prefetchCount The number of messages to prefetch.
104
     * @return $this
105
     * @throws \AMQPConnectionException
106
     */
107 1
    public function setQos($prefetchSize, $prefetchCount)
108
    {
109 1
        $this->getResource()->qos($prefetchSize, $prefetchCount);
110
111 1
        return $this;
112
    }
113
114
    /**
115
     * Start a transaction.
116
     *
117
     * @return $this
118
     * @throws \AMQPChannelException
119
     * @throws \AMQPConnectionException
120
     */
121 1
    public function startTransaction()
122
    {
123 1
        $this->getResource()->startTransaction();
124
125 1
        return $this;
126
    }
127
128
    /**
129
     * Commit a pending transaction.
130
     *
131
     * @return $this
132
     * @throws \AMQPChannelException
133
     * @throws \AMQPConnectionException
134
     */
135 1
    public function commitTransaction()
136
    {
137 1
        $this->getResource()->commitTransaction();
138
139 1
        return $this;
140
    }
141
142
    /**
143
     * Rollback a transaction.
144
     *
145
     * @return $this
146
     * @throws \AMQPChannelException
147
     * @throws \AMQPConnectionException
148
     */
149 1
    public function rollbackTransaction()
150
    {
151 1
        $this->getResource()->rollbackTransaction();
152
153 1
        return $this;
154
    }
155
156
    /**
157
     * @param Connection $connection
158
     * @return $this
159
     */
160 1
    public function setConnection(Connection $connection)
161
    {
162 1
        $this->connection = $connection;
163 1
        return $this;
164
    }
165
166
    /**
167
     * Get the connection object in use
168
     *
169
     * @return Connection
170
     */
171 1
    public function getConnection()
172
    {
173 1
        return $this->connection;
174
    }
175
176
    /**
177
     * Redeliver unacknowledged messages.
178
     *
179
     * @param bool $requeue
180
     * @return $this
181
     */
182 2
    public function basicRecover($requeue = true)
183
    {
184 2
        $this->getResource()->basicRecover($requeue);
185
186 2
        return $this;
187
    }
188
189
    /**
190
     * @param Options\QueueOptions $options
191
     * @param \AMQPQueue           $resource
192
     * @return Queue
193
     * @throws \AMQPConnectionException
194
     * @throws \AMQPQueueException
195
     */
196 2
    public function createQueue(Options\QueueOptions $options, $resource = null)
197
    {
198 2
        $queue = clone $this->queuePrototype;
199
200 2
        if ($resource instanceof \AMQPQueue) {
201 1
            $queue->setResource($resource);
202
        } else {
203 1
            $queue->setResource($this->createQueueResource());
204
        }
205
206 2
        $queue->setChannel($this);
207 2
        $queue->setOptions($options);
208
209 2
        return $queue;
210
    }
211
212
    /**
213
     * @param Options\ExchangeOptions $options
214
     * @param \AMQPExchange           $resource
215
     * @return Exchange
216
     * @throws \AMQPConnectionException
217
     * @throws \AMQPExchangeException
218
     */
219 2
    public function createExchange(Options\ExchangeOptions $options, $resource = null)
220
    {
221 2
        $exchange = clone $this->exchangePrototype;
222
223 2
        if ($resource instanceof \AMQPExchange) {
224 1
            $exchange->setResource($resource);
225
        } else {
226 1
            $exchange->setResource($this->createExchangeResource());
227
        }
228
229 2
        $exchange->setChannel($this);
230 2
        $exchange->setOptions($options);
231
232 2
        return $exchange;
233
    }
234
235
    /**
236
     * @return \AMQPQueue
237
     * @throws \AMQPConnectionException
238
     * @throws \AMQPQueueException
239
     * @codeCoverageIgnore
240
     */
241
    protected function createQueueResource()
242
    {
243
        return new \AMQPQueue($this->getResource());
244
    }
245
246
    /**
247
     * @return \AMQPExchange
248
     * @throws \AMQPConnectionException
249
     * @throws \AMQPExchangeException
250
     * @codeCoverageIgnore
251
     */
252
    protected function createExchangeResource()
253
    {
254
        return new \AMQPExchange($this->getResource());
255
    }
256
}
257