Channel::setResource()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 0
loc 6
ccs 3
cts 3
cp 1
rs 9.4285
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
3
namespace AMQPAL\Adapter\PhpAmqpLib;
4
5
use AMQPAL\Adapter\ChannelInterface;
6
use AMQPAL\Adapter\ConnectionInterface;
7
use AMQPAL\Adapter\ExchangeInterface;
8
use AMQPAL\Adapter\QueueInterface;
9
use AMQPAL\Exception;
10
use AMQPAL\Options;
11
use PhpAmqpLib\Channel\AMQPChannel;
12
13
/**
14
 * Class Channel
15
 *
16
 * @package AMQPAL\Adapter\PhpAmqpLib
17
 */
18
class Channel implements ChannelInterface
19
{
20
21
    /**
22
     * @var AMQPChannel
23
     */
24
    protected $resource;
25
    /**
26
     * @var ConnectionInterface
27
     */
28
    protected $connection;
29
    /**
30
     * @var Exchange
31
     */
32
    protected $exchangePrototype;
33
    /**
34
     * @var Queue
35
     */
36
    protected $queuePrototype;
37
38
    /**
39
     * Channel constructor.
40
     *
41
     * @param Exchange $exchangePrototype
42
     * @param Queue    $queuePrototype
43
     */
44 30
    public function __construct(Exchange $exchangePrototype = null, Queue $queuePrototype = null)
45
    {
46 30
        $this->registerExchange($exchangePrototype ?: new Exchange());
47 30
        $this->registerQueue($queuePrototype ?: new Queue());
48 30
    }
49
50
    /**
51
     * @param Exchange $exchange
52
     */
53 30
    public function registerExchange(Exchange $exchange)
54
    {
55 30
        $this->exchangePrototype = $exchange;
56 30
    }
57
58
    /**
59
     * @param Queue $queue
60
     */
61 30
    public function registerQueue(Queue $queue)
62
    {
63 30
        $this->queuePrototype = $queue;
64 30
    }
65
66
    /**
67
     * Check the channel connection.
68
     *
69
     * @return bool Indicates whether the channel is connected.
70
     */
71 1
    public function isConnected()
72
    {
73 1
        return $this->getConnection()->isConnected();
74
    }
75
76
    /**
77
     * Get the connection object in use
78
     *
79
     * @return Connection
80
     */
81 2
    public function getConnection()
82
    {
83 2
        return $this->connection;
84
    }
85
86
    /**
87
     * @param Connection $connection
88
     * @return $this
89
     */
90 9
    public function setConnection(Connection $connection)
91
    {
92 9
        $this->connection = $connection;
93 9
        return $this;
94
    }
95
96
    /**
97
     * Return internal channel ID
98
     *
99
     * @return integer
100
     */
101 1
    public function getChannelId()
102
    {
103 1
        return $this->getResource()->getChannelId();
104
    }
105
106
    /**
107
     * @return AMQPChannel
108
     */
109 15
    public function getResource()
110
    {
111 15
        return $this->resource;
112
    }
113
114
    /**
115
     * @param AMQPChannel $resource
116
     * @return $this
117
     */
118 16
    public function setResource(AMQPChannel $resource)
119
    {
120 16
        $this->resource = $resource;
121
122 16
        return $this;
123
    }
124
125
    /**
126
     * Set the window size and the number of messages to prefetch from the broker.
127
     *
128
     * @param int $prefetchSize  The window size, in octets, to prefetch.
129
     * @param int $prefetchCount The number of messages to prefetch.
130
     * @return $this
131
     */
132 1
    public function setQos($prefetchSize, $prefetchCount)
133
    {
134 1
        $this->getResource()->basic_qos($prefetchSize, $prefetchCount, false);
135
136 1
        return $this;
137
    }
138
139
    /**
140
     * Start a transaction.
141
     *
142
     * @return $this
143
     */
144 1
    public function startTransaction()
145
    {
146 1
        $this->getResource()->tx_select();
147
148 1
        return $this;
149
    }
150
151
    /**
152
     * Commit a pending transaction.
153
     *
154
     * @return $this
155
     */
156 1
    public function commitTransaction()
157
    {
158 1
        $this->getResource()->tx_commit();
159
160 1
        return $this;
161
    }
162
163
    /**
164
     * Rollback a transaction.
165
     *
166
     * @return $this
167
     */
168 1
    public function rollbackTransaction()
169
    {
170 1
        $this->getResource()->tx_rollback();
171
172 1
        return $this;
173
    }
174
175
    /**
176
     * Redeliver unacknowledged messages.
177
     *
178
     * @param bool $requeue
179
     * @return $this
180
     */
181 2
    public function basicRecover($requeue = true)
182
    {
183 2
        $this->getResource()->basic_recover($requeue);
184
185 2
        return $this;
186
    }
187
188
    /**
189
     * Create a new queue
190
     *
191
     * @param Options\QueueOptions|\Traversable|array $options
192
     * @return QueueInterface
193
     * @throws Exception\BadMethodCallException
194
     * @throws Exception\InvalidArgumentException
195
     */
196 7
    public function createQueue($options)
197
    {
198 7
        $queue = clone $this->queuePrototype;
199
200 7
        $queue->setChannel($this);
201 7
        $queue->setOptions($options);
202
203 7
        return $queue;
204
    }
205
206
    /**
207
     * Create a new exchange
208
     *
209
     * @param Options\ExchangeOptions|\Traversable|array $options
210
     * @return ExchangeInterface
211
     * @throws Exception\BadMethodCallException
212
     * @throws Exception\InvalidArgumentException
213
     */
214 7
    public function createExchange($options)
215
    {
216 7
        $exchange = clone $this->exchangePrototype;
217
218 7
        $exchange->setChannel($this);
219 7
        $exchange->setOptions($options);
220
221 7
        return $exchange;
222
    }
223
}
224