Completed
Push — master ( 4cd93a...e8f4ec )
by Thomas Mauro
10:51
created

Channel::createQueue()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 15
ccs 8
cts 8
cp 1
rs 9.4285
cc 2
eloc 9
nc 2
nop 2
crap 2
1
<?php
2
3
namespace AMQPAL\Adapter\AMQP;
4
5
use AMQPAL\Adapter\ChannelInterface;
6
use AMQPAL\Exception;
7
use AMQPAL\Options;
8
9
/**
10
 * Class Channel
11
 *
12
 * @package AMQPAL\Adapter\AMQP
13
 */
14
class Channel implements ChannelInterface
15
{
16
    /**
17
     * @var \AMQPChannel
18
     */
19
    protected $resource;
20
    /**
21
     * @var Connection
22
     */
23
    protected $connection;
24
    /**
25
     * @var Exchange
26
     */
27
    protected $exchangePrototype;
28
    /**
29
     * @var Queue
30
     */
31
    protected $queuePrototype;
32
33
    /**
34
     * Channel constructor.
35
     *
36
     * @param Exchange $exchangePrototype
37
     * @param Queue    $queuePrototype
38
     */
39 22
    public function __construct(Exchange $exchangePrototype = null, Queue $queuePrototype = null)
40
    {
41 22
        $this->registerExchange($exchangePrototype ?: new Exchange());
42 22
        $this->registerQueue($queuePrototype ?: new Queue());
43 22
    }
44
45
    /**
46
     * @param Exchange $exchange
47
     */
48 22
    public function registerExchange(Exchange $exchange)
49
    {
50 22
        $this->exchangePrototype = $exchange;
51 22
    }
52
53
    /**
54
     * @param Queue $queue
55
     */
56 22
    public function registerQueue(Queue $queue)
57
    {
58 22
        $this->queuePrototype = $queue;
59 22
    }
60
61
    /**
62
     * Check the channel connection.
63
     *
64
     * @return bool Indicates whether the channel is connected.
65
     */
66 1
    public function isConnected()
67
    {
68 1
        return $this->getResource()->isConnected();
69
    }
70
71
    /**
72
     * @return \AMQPChannel
73
     */
74 9
    public function getResource()
75
    {
76 9
        return $this->resource;
77
    }
78
79
    /**
80
     * @param \AMQPChannel $resource
81
     * @return $this
82
     */
83 9
    public function setResource(\AMQPChannel $resource)
84
    {
85 9
        $this->resource = $resource;
86
87 9
        return $this;
88
    }
89
90
    /**
91
     * Return internal channel ID
92
     *
93
     * @return integer
94
     */
95 1
    public function getChannelId()
96
    {
97 1
        return $this->getResource()->getChannelId();
98
    }
99
100
    /**
101
     * Set the window size and the number of messages to prefetch from the broker.
102
     *
103
     * @param int $prefetchSize  The window size, in octets, to prefetch.
104
     * @param int $prefetchCount The number of messages to prefetch.
105
     * @return $this
106
     * @throws \AMQPConnectionException
107
     */
108 1
    public function setQos($prefetchSize, $prefetchCount)
109
    {
110 1
        $this->getResource()->qos($prefetchSize, $prefetchCount);
111
112 1
        return $this;
113
    }
114
115
    /**
116
     * Start a transaction.
117
     *
118
     * @return $this
119
     * @throws \AMQPChannelException
120
     * @throws \AMQPConnectionException
121
     */
122 1
    public function startTransaction()
123
    {
124 1
        $this->getResource()->startTransaction();
125
126 1
        return $this;
127
    }
128
129
    /**
130
     * Commit a pending transaction.
131
     *
132
     * @return $this
133
     * @throws \AMQPChannelException
134
     * @throws \AMQPConnectionException
135
     */
136 1
    public function commitTransaction()
137
    {
138 1
        $this->getResource()->commitTransaction();
139
140 1
        return $this;
141
    }
142
143
    /**
144
     * Rollback a transaction.
145
     *
146
     * @return $this
147
     * @throws \AMQPChannelException
148
     * @throws \AMQPConnectionException
149
     */
150 1
    public function rollbackTransaction()
151
    {
152 1
        $this->getResource()->rollbackTransaction();
153
154 1
        return $this;
155
    }
156
157
    /**
158
     * Get the connection object in use
159
     *
160
     * @return Connection
161
     */
162 1
    public function getConnection()
163
    {
164 1
        return $this->connection;
165
    }
166
167
    /**
168
     * @param Connection $connection
169
     * @return $this
170
     */
171 1
    public function setConnection(Connection $connection)
172
    {
173 1
        $this->connection = $connection;
174 1
        return $this;
175
    }
176
177
    /**
178
     * Redeliver unacknowledged messages.
179
     *
180
     * @param bool $requeue
181
     * @return $this
182
     */
183 2
    public function basicRecover($requeue = true)
184
    {
185 2
        $this->getResource()->basicRecover($requeue);
186
187 2
        return $this;
188
    }
189
190
    /**
191
     * @param Options\QueueOptions|\Traversable|array $options
192
     * @param \AMQPQueue                              $resource
193
     * @return Queue
194
     * @throws \AMQPConnectionException
195
     * @throws \AMQPQueueException
196
     * @throws Exception\BadMethodCallException
197
     * @throws Exception\InvalidArgumentException
198
     */
199 2
    public function createQueue($options, $resource = null)
200
    {
201 2
        $queue = clone $this->queuePrototype;
202
203 2
        if ($resource instanceof \AMQPQueue) {
204 1
            $queue->setResource($resource);
205
        } else {
206 1
            $queue->setResource($this->createQueueResource());
207
        }
208
209 2
        $queue->setChannel($this);
210 2
        $queue->setOptions($options);
211
212 2
        return $queue;
213
    }
214
215
    /**
216
     * @return \AMQPQueue
217
     * @throws \AMQPConnectionException
218
     * @throws \AMQPQueueException
219
     * @codeCoverageIgnore
220
     */
221
    protected function createQueueResource()
222
    {
223
        return new \AMQPQueue($this->getResource());
224
    }
225
226
    /**
227
     * @param Options\ExchangeOptions|\Traversable|array $options
228
     * @param \AMQPExchange                              $resource
229
     * @return Exchange
230
     * @throws \AMQPConnectionException
231
     * @throws \AMQPExchangeException
232
     * @throws Exception\BadMethodCallException
233
     * @throws Exception\InvalidArgumentException
234
     */
235 2
    public function createExchange($options, $resource = null)
236
    {
237 2
        $exchange = clone $this->exchangePrototype;
238
239 2
        if ($resource instanceof \AMQPExchange) {
240 1
            $exchange->setResource($resource);
241
        } else {
242 1
            $exchange->setResource($this->createExchangeResource());
243
        }
244
245 2
        if (!$options instanceof Options\ExchangeOptions) {
246
            $options = new Options\ExchangeOptions($options);
247
        }
248
249 2
        $exchange->setChannel($this);
250 2
        $exchange->setOptions($options);
251
252 2
        return $exchange;
253
    }
254
255
    /**
256
     * @return \AMQPExchange
257
     * @throws \AMQPConnectionException
258
     * @throws \AMQPExchangeException
259
     * @codeCoverageIgnore
260
     */
261
    protected function createExchangeResource()
262
    {
263
        return new \AMQPExchange($this->getResource());
264
    }
265
}
266