Completed
Push — master ( f55883...4285c1 )
by Thomas Mauro
13:12 queued 10:06
created

Channel   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 202
Duplicated Lines 0 %

Coupling/Cohesion

Components 4
Dependencies 4

Test Coverage

Coverage 100%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 18
c 1
b 0
f 0
lcom 4
cbo 4
dl 0
loc 202
ccs 49
cts 49
cp 1
rs 10

16 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 3
A registerExchange() 0 4 1
A registerQueue() 0 4 1
A getResource() 0 4 1
A setResource() 0 6 1
A isConnected() 0 4 1
A getChannelId() 0 4 1
A setQos() 0 6 1
A startTransaction() 0 6 1
A commitTransaction() 0 6 1
A rollbackTransaction() 0 6 1
A setConnection() 0 5 1
A getConnection() 0 4 1
A basicRecover() 0 6 1
A createQueue() 0 9 1
A createExchange() 0 9 1
1
<?php
2
3
namespace AMQPAL\Adapter\PhpAmqpLib;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use AMQPAL\Adapter\ChannelInterface;
7
use AMQPAL\Adapter\ConnectionInterface;
8
use AMQPAL\Adapter\ExchangeInterface;
9
use AMQPAL\Adapter\QueueInterface;
10
use AMQPAL\Options;
11
12
/**
13
 * Class Channel
14
 *
15
 * @package AMQPAL\Adapter\PhpAmqpLib
16
 */
17
class Channel implements ChannelInterface
18
{
19
20
    /**
21
     * @var AMQPChannel
22
     */
23
    protected $resource;
24
    /**
25
     * @var ConnectionInterface
26
     */
27
    protected $connection;
28
    /**
29
     * @var Exchange
30
     */
31
    protected $exchangePrototype;
32
    /**
33
     * @var Queue
34
     */
35
    protected $queuePrototype;
36
37
    /**
38
     * Channel constructor.
39
     *
40
     * @param Exchange $exchangePrototype
41
     * @param Queue    $queuePrototype
42
     */
43 13
    public function __construct(Exchange $exchangePrototype = null, Queue $queuePrototype = null)
44
    {
45 13
        $this->registerExchange($exchangePrototype ?: new Exchange());
46 13
        $this->registerQueue($queuePrototype ?: new Queue());
47 13
    }
48
49
    /**
50
     * @param Exchange $exchange
51
     */
52 13
    public function registerExchange(Exchange $exchange)
53
    {
54 13
        $this->exchangePrototype = $exchange;
55 13
    }
56
57
    /**
58
     * @param Queue $queue
59
     */
60 13
    public function registerQueue(Queue $queue)
61
    {
62 13
        $this->queuePrototype = $queue;
63 13
    }
64
65
    /**
66
     * @return AMQPChannel
67
     */
68 8
    public function getResource()
69
    {
70 8
        return $this->resource;
71
    }
72
73
    /**
74
     * @param AMQPChannel $resource
75
     * @return $this
76
     */
77 9
    public function setResource(AMQPChannel $resource)
78
    {
79 9
        $this->resource = $resource;
80
81 9
        return $this;
82
    }
83
84
    /**
85
     * Check the channel connection.
86
     *
87
     * @return bool Indicates whether the channel is connected.
88
     */
89 1
    public function isConnected()
90
    {
91 1
        return $this->getConnection()->isConnected();
92
    }
93
94
    /**
95
     * Return internal channel ID
96
     *
97
     * @return integer
98
     */
99 1
    public function getChannelId()
100
    {
101 1
        return $this->getResource()->getChannelId();
102
    }
103
104
    /**
105
     * Set the window size and the number of messages to prefetch from the broker.
106
     *
107
     * @param int $prefetchSize  The window size, in octets, to prefetch.
108
     * @param int $prefetchCount The number of messages to prefetch.
109
     * @return $this
110
     */
111 1
    public function setQos($prefetchSize, $prefetchCount)
112
    {
113 1
        $this->getResource()->basic_qos($prefetchSize, $prefetchCount, false);
114
115 1
        return $this;
116
    }
117
118
    /**
119
     * Start a transaction.
120
     *
121
     * @return $this
122
     */
123 1
    public function startTransaction()
124
    {
125 1
        $this->getResource()->tx_select();
126
127 1
        return $this;
128
    }
129
130
    /**
131
     * Commit a pending transaction.
132
     *
133
     * @return $this
134
     */
135 1
    public function commitTransaction()
136
    {
137 1
        $this->getResource()->tx_commit();
138
139 1
        return $this;
140
    }
141
142
    /**
143
     * Rollback a transaction.
144
     *
145
     * @return $this
146
     */
147 1
    public function rollbackTransaction()
148
    {
149 1
        $this->getResource()->tx_rollback();
150
151 1
        return $this;
152
    }
153
154
    /**
155
     * @param Connection $connection
156
     * @return $this
157
     */
158 2
    public function setConnection(Connection $connection)
159
    {
160 2
        $this->connection = $connection;
161 2
        return $this;
162
    }
163
164
    /**
165
     * Get the connection object in use
166
     *
167
     * @return Connection
168
     */
169 2
    public function getConnection()
170
    {
171 2
        return $this->connection;
172
    }
173
174
    /**
175
     * Redeliver unacknowledged messages.
176
     *
177
     * @param bool $requeue
178
     * @return $this
179
     */
180 2
    public function basicRecover($requeue = true)
181
    {
182 2
        $this->getResource()->basic_recover($requeue);
183
184 2
        return $this;
185
    }
186
187
    /**
188
     * Create a new queue
189
     *
190
     * @param Options\QueueOptions $options
191
     * @return QueueInterface
192
     */
193 1
    public function createQueue(Options\QueueOptions $options)
194
    {
195 1
        $queue = clone $this->queuePrototype;
196
197 1
        $queue->setChannel($this);
198 1
        $queue->setOptions($options);
199
200 1
        return $queue;
201
    }
202
203
    /**
204
     * Create a new exchange
205
     *
206
     * @param Options\ExchangeOptions $options
207
     * @return ExchangeInterface
208
     */
209 1
    public function createExchange(Options\ExchangeOptions $options)
210
    {
211 1
        $exchange = clone $this->exchangePrototype;
212
213 1
        $exchange->setChannel($this);
214 1
        $exchange->setOptions($options);
215
216 1
        return $exchange;
217
    }
218
}
219